Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@
"group": "UDFs",
"pages": [
"geneva/udfs/index",
"geneva/udfs/blobs"
"geneva/udfs/blobs",
"geneva/udfs/error_handling"
]
},
{
Expand Down
214 changes: 214 additions & 0 deletions docs/geneva/udfs/error_handling.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
---
title: Error Handling in Geneva UDFs
sidebarTitle: Error Handling
description: Learn how configure retry, skip, and fail behaviors for UDFs.
icon: exclamation
---

Geneva provides three ways to handle errors, in increasing complexity: factory functions, exception matchers, and full Tenacity control.

## Quick Start: Factory Functions

Use factory functions for common error handling patterns:

```python
from geneva import udf, retry_transient
import pyarrow as pa

@udf(data_type=pa.int32(), on_error=retry_transient())
def my_udf(x: int) -> int:
# Will retry on network errors (ConnectionError, TimeoutError, OSError)
return call_external_api(x)
```

Geneva provides four built-in factory functions:

| Function | Behavior |
|----------|----------|
| `retry_transient()` | Retry `ConnectionError`, `TimeoutError`, `OSError` with exponential backoff |
| `retry_all()` | Retry any exception with exponential backoff |
| `skip_on_error()` | Return `None` for any exception (skip the row) |
| `fail_fast()` | Fail immediately on any exception (default behavior) |

### Customizing Retry Behavior

Factory functions accept parameters to customize behavior:

```python
from geneva import udf, retry_transient, retry_all

# Increase max attempts
@udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5))
def more_retries(x: int) -> int:
...

# Change backoff strategy
@udf(data_type=pa.int32(), on_error=retry_all(max_attempts=3, backoff="fixed"))
def fixed_backoff(x: int) -> int:
...
```

**Parameters:**
- `max_attempts` (int): Maximum number of attempts (default: 3)
- `backoff` (str): Backoff strategy between retries
- `"exponential"` (default): 1s, 2s, 4s, 8s... with jitter, capped at 60s
- `"fixed"`: Fixed 1s delay between attempts
- `"linear"`: 1s, 2s, 3s, 4s... capped at 60s

## Custom Exception Handling: Matchers

For fine-grained control, use `Retry`, `Skip`, and `Fail` matchers:

```python
from geneva import udf, Retry, Skip, Fail

@udf(
data_type=pa.int32(),
on_error=[
Retry(ConnectionError, TimeoutError, max_attempts=3),
Retry(ValueError, match="rate limit", max_attempts=5),
Skip(ValueError), # Other ValueErrors - skip the row
Fail(AuthError), # Auth failures - fail immediately
]
)
def custom_handling(x: int) -> int:
...
```

**How matching works:**
1. Matchers are evaluated in order (first match wins)
2. More specific matchers should come before general ones
3. Unmatched exceptions fail the job

### Exception Matchers

| Matcher | Behavior | Parameters |
|---------|----------|------------|
| `Retry(...)` | Retry with backoff, then fail | `max_attempts`, `backoff`, `match` |
| `Skip(...)` | Return `None` for that row | `match` |
| `Fail(...)` | Fail the job immediately | `match` |

**Syntax:**
```python
# Single exception
Retry(ConnectionError)

# Multiple exceptions
Retry(ConnectionError, TimeoutError, OSError)

# With parameters
Retry(ConnectionError, max_attempts=5, backoff="fixed")

# With message matching
Retry(ValueError, match="rate limit")
```

### Message Matching

Use the `match` parameter to filter exceptions by their message content. The pattern is a regex:

```python
from geneva import Retry, Skip

# Simple substring (works because regex matches substrings)
Retry(ValueError, match="rate limit")
# Matches: ValueError("rate limit exceeded")

# Regex pattern
Retry(ValueError, match=r"rate.?limit")
# Matches: ValueError("rate limit")
# Matches: ValueError("ratelimit")
# Matches: ValueError("rate_limit")

# Case-insensitive matching (use (?i) flag)
Retry(ValueError, match=r"(?i)rate limit")
# Matches: ValueError("Rate Limit exceeded")
# Matches: ValueError("RATE LIMIT hit")

# Regex alternation (match multiple patterns)
Retry(ValueError, match=r"429|rate.?limit|throttl")
# Matches: ValueError("Error 429")
# Matches: ValueError("rate limit exceeded")
# Matches: ValueError("Request throttled")
```

For example, using matchers to distinguish error types:

```python
@udf(
data_type=pa.string(),
on_error=[
# Retry rate limits with more attempts
Retry(ValueError, match="rate limit", max_attempts=10),
# Skip invalid input
Skip(ValueError, match="invalid"),
# Fail on other ValueErrors
Fail(ValueError),
]
)
def api_call(x: str) -> str:
...
```


### Behavior Summary

| Outcome | What Happens | When to Use |
|---------|--------------|-------------|
| **Retry** | Retry with backoff, then fail/skip | Transient errors: network issues, rate limits, timeouts |
| **Skip** | Return `None` for that row | Bad input data, row-specific failures, optional enrichment |
| **Fail** | Kill the job immediately | Fatal errors: auth failures, configuration errors |

## Advanced: Full Tenacity Control

For power users who need custom callbacks or complex retry conditions, omit `on_error` and use `error_handling=`:

```python
from geneva import udf
from geneva.debug.error_store import ErrorHandlingConfig, UDFRetryConfig
from tenacity import wait_random_exponential, stop_after_delay

@udf(
data_type=pa.int32(),
error_handling=ErrorHandlingConfig(
retry_config=UDFRetryConfig(
retry=my_custom_retry_condition,
stop=stop_after_delay(300),
wait=wait_random_exponential(min=1, max=120),
before_sleep=my_logging_callback,
),
),
)
def power_user_udf(x: int) -> int:
...
```

Note: `on_error=` and `error_handling=` cannot be used together.

## Restrictions

- **Skip behavior** only works with scalar UDFs (functions that process one row at a time)
- For batch UDFs that receive `RecordBatch`, use `Retry` or `Fail` only
- **All Retry matchers must use the same backoff strategy.** You cannot mix different backoff strategies in the same `on_error` list:

```python
@udf(on_error=[
Retry(ConnectionError, backoff="exponential"),
Retry(TimeoutError, backoff="fixed"), # Error: different backoff!
])

@udf(on_error=[
Retry(ConnectionError, backoff="fixed"),
Retry(TimeoutError, backoff="fixed"), # Same backoff - OK
])
```

- **Invalid regex patterns are rejected at construction time:**

```python
# This will raise ValueError due to the unclosed bracket
Retry(ValueError, match=r"[invalid")

# But this will work:
Retry(ValueError, match=r"rate.?limit")
```
43 changes: 27 additions & 16 deletions docs/geneva/udfs/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ from geneva import udf
@udf
def area_udf(x: int, y: int) -> int:
return x * y

@udf
def download_udf(filename:str) -> bytes:
import requests
resp = requests.get(filename)
res.raise_for_status()
return resp.content

```

This UDF will take the value of x and value of y from each row and return the product. The `@udf` wrapper is all that is needed.
This UDF will take the value of `x` and value of `y` from each row and return the product. The `@udf` wrapper is all that is needed.

### Batched UDFs

Expand Down Expand Up @@ -120,7 +112,7 @@ class OpenAIEmbedding(Callable):
The `udf` can have extra annotations that specify resource requirements and operational characteristics.
These are just add parameters to the `udf(...)`.

### Resources requirements for UDFs
### Resource requirements for UDFs

Some workers may require specific resources such as gpus, cpus and certain amounts of RAM.

Expand All @@ -134,17 +126,37 @@ def func(...):

### Operational parameters for UDFs

UDFs can be quite varied -- some can be simple operations where thousands of calls can be completed per second, while others may be slow and require 30s per row.
#### checkpoint_size

`checkpoint_size` controls how many rows are processed before checkpointing, and therefore reporting and saving progress.

UDFs can be quite varied: some can be simple operations where thousands of calls can be completed per second, while others may be slow and require 30s per row. So a simple default like "every 1000 rows" might write once a second or once every 8 hours!

Geneva will handle this internally, using an experimental feature that will adapt checkpoint sizing as a UDF progresses. However, if you want to see writes more or less frequently, you can set this manually. There are three parameters:

In LanceDB, the default number of rows per fragment is 1024 * 1024 rows. Conside a captioning UDF that takes 30s per row. It could take a year (!) before any results show up! (e.g. 30s/row * 1024*1024 rows/fragment => 30M s/fragment => 8.3k hours/fragment -> 347 days/fragment). To enable these to be parallelized, we provide a `batch_size` setting so the work can be split between workers and so that that partial results are checkpointed more frequently to enable finer-grained progress and job recovery.
- `checkpoint_size`: the seed for the initial checkpoint size
- `min_checkpoint_size`: the minimum value that Geneva will use while adapting checkpoint size
- `max_checkpoint_size`: the maximum value that Geneva will use while adapting checkpoint size

By default `batch_size` is 100 computed rows per checkpoint. So for an expensive captioning UDF that can take 30s per row, you may get a checkpoint every 3000s (50mins). With 100 gpus, our job could finish in 3.5 days! For cheap operations that can compute 100 rows per second you'd potentially be checkpointing every second. Tuning this can help you see progress more frequently.
Therefore, to force a checkpoint size (and effectively disable adaptive batch sizing), set all three of these parameters to the same value.

### Error handling

Depending on the UDF, you may want Geneva to ignore rows that hit failures, retry, or fail the entire job. For simple cases, Geneva provides a simple parameter, `on_error`, with the following options:

| Function | Behavior |
|----------|----------|
| `retry_transient()` | Retry `ConnectionError`, `TimeoutError`, `OSError` with exponential backoff |
| `retry_all()` | Retry any exception with exponential backoff |
| `skip_on_error()` | Return `None` for any exception (skip the row) |
| `fail_fast()` | Fail immediately on any exception (default behavior) |

If those are not specific enough, Geneva also provides [many more error handling options](/geneva/udfs/error_handling).

## Registering Features with UDFs

Registering a feature is done by providing the `Table.add_columns()` function a new column name and the Geneva UDF.

Let's start by obtaining the table `tbl`
```python
import geneva
import numpy as np
Expand Down Expand Up @@ -251,14 +263,13 @@ The next section shows you how to change your column definition by `alter`ing th

## Altering UDFs


You now want to revise the code. To make the change, you'd update the UDF used to compute the column using the `alter_columns` API and the updated function. The example below replaces the definition of column `area` to use the `area_udf_v2` function.

```python
table.alter_columns({"path": "area", "udf": area_udf_v2} )
```

After making this change, the existing data already in the table does not change. However, when you perform your next basic `backfill` operation, all values would be recalculated and updated. If you only wanted some rows updated , you could perform a filtered backfill, targeting the specific rows that need the new upates.
After making this change, the existing data already in the table does not change. However, when you perform your next basic `backfill` operation, all values would be recalculated and updated. If you only wanted some rows updated, you could perform a filtered backfill, targeting the specific rows that need the new upates.

For example, this filter would only update the rows where area was currently null.
```python
Expand Down
Loading