diff --git a/docs/docs.json b/docs/docs.json index 2db3ae6..20d07c6 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -155,7 +155,8 @@ "group": "UDFs", "pages": [ "geneva/udfs/index", - "geneva/udfs/blobs" + "geneva/udfs/blobs", + "geneva/udfs/error_handling" ] }, { diff --git a/docs/geneva/udfs/error_handling.mdx b/docs/geneva/udfs/error_handling.mdx new file mode 100644 index 0000000..0faf802 --- /dev/null +++ b/docs/geneva/udfs/error_handling.mdx @@ -0,0 +1,214 @@ +--- +title: Error Handling in Geneva UDFs +sidebarTitle: Error Handling +description: Learn how configure retry, skip, and fail behaviors for UDFs. +icon: exclamation +--- + +Geneva provides three ways to handle errors, in increasing complexity: factory functions, exception matchers, and full Tenacity control. + +## Quick Start: Factory Functions + +Use factory functions for common error handling patterns: + +```python +from geneva import udf, retry_transient +import pyarrow as pa + +@udf(data_type=pa.int32(), on_error=retry_transient()) +def my_udf(x: int) -> int: + # Will retry on network errors (ConnectionError, TimeoutError, OSError) + return call_external_api(x) +``` + +Geneva provides four built-in factory functions: + +| Function | Behavior | +|----------|----------| +| `retry_transient()` | Retry `ConnectionError`, `TimeoutError`, `OSError` with exponential backoff | +| `retry_all()` | Retry any exception with exponential backoff | +| `skip_on_error()` | Return `None` for any exception (skip the row) | +| `fail_fast()` | Fail immediately on any exception (default behavior) | + +### Customizing Retry Behavior + +Factory functions accept parameters to customize behavior: + +```python +from geneva import udf, retry_transient, retry_all + +# Increase max attempts +@udf(data_type=pa.int32(), on_error=retry_transient(max_attempts=5)) +def more_retries(x: int) -> int: + ... + +# Change backoff strategy +@udf(data_type=pa.int32(), on_error=retry_all(max_attempts=3, backoff="fixed")) +def fixed_backoff(x: int) -> int: + ... +``` + +**Parameters:** +- `max_attempts` (int): Maximum number of attempts (default: 3) +- `backoff` (str): Backoff strategy between retries + - `"exponential"` (default): 1s, 2s, 4s, 8s... with jitter, capped at 60s + - `"fixed"`: Fixed 1s delay between attempts + - `"linear"`: 1s, 2s, 3s, 4s... capped at 60s + +## Custom Exception Handling: Matchers + +For fine-grained control, use `Retry`, `Skip`, and `Fail` matchers: + +```python +from geneva import udf, Retry, Skip, Fail + +@udf( + data_type=pa.int32(), + on_error=[ + Retry(ConnectionError, TimeoutError, max_attempts=3), + Retry(ValueError, match="rate limit", max_attempts=5), + Skip(ValueError), # Other ValueErrors - skip the row + Fail(AuthError), # Auth failures - fail immediately + ] +) +def custom_handling(x: int) -> int: + ... +``` + +**How matching works:** +1. Matchers are evaluated in order (first match wins) +2. More specific matchers should come before general ones +3. Unmatched exceptions fail the job + +### Exception Matchers + +| Matcher | Behavior | Parameters | +|---------|----------|------------| +| `Retry(...)` | Retry with backoff, then fail | `max_attempts`, `backoff`, `match` | +| `Skip(...)` | Return `None` for that row | `match` | +| `Fail(...)` | Fail the job immediately | `match` | + +**Syntax:** +```python +# Single exception +Retry(ConnectionError) + +# Multiple exceptions +Retry(ConnectionError, TimeoutError, OSError) + +# With parameters +Retry(ConnectionError, max_attempts=5, backoff="fixed") + +# With message matching +Retry(ValueError, match="rate limit") +``` + +### Message Matching + +Use the `match` parameter to filter exceptions by their message content. The pattern is a regex: + +```python +from geneva import Retry, Skip + +# Simple substring (works because regex matches substrings) +Retry(ValueError, match="rate limit") +# Matches: ValueError("rate limit exceeded") + +# Regex pattern +Retry(ValueError, match=r"rate.?limit") +# Matches: ValueError("rate limit") +# Matches: ValueError("ratelimit") +# Matches: ValueError("rate_limit") + +# Case-insensitive matching (use (?i) flag) +Retry(ValueError, match=r"(?i)rate limit") +# Matches: ValueError("Rate Limit exceeded") +# Matches: ValueError("RATE LIMIT hit") + +# Regex alternation (match multiple patterns) +Retry(ValueError, match=r"429|rate.?limit|throttl") +# Matches: ValueError("Error 429") +# Matches: ValueError("rate limit exceeded") +# Matches: ValueError("Request throttled") +``` + +For example, using matchers to distinguish error types: + +```python +@udf( + data_type=pa.string(), + on_error=[ + # Retry rate limits with more attempts + Retry(ValueError, match="rate limit", max_attempts=10), + # Skip invalid input + Skip(ValueError, match="invalid"), + # Fail on other ValueErrors + Fail(ValueError), + ] +) +def api_call(x: str) -> str: + ... +``` + + +### Behavior Summary + +| Outcome | What Happens | When to Use | +|---------|--------------|-------------| +| **Retry** | Retry with backoff, then fail/skip | Transient errors: network issues, rate limits, timeouts | +| **Skip** | Return `None` for that row | Bad input data, row-specific failures, optional enrichment | +| **Fail** | Kill the job immediately | Fatal errors: auth failures, configuration errors | + +## Advanced: Full Tenacity Control + +For power users who need custom callbacks or complex retry conditions, omit `on_error` and use `error_handling=`: + +```python +from geneva import udf +from geneva.debug.error_store import ErrorHandlingConfig, UDFRetryConfig +from tenacity import wait_random_exponential, stop_after_delay + +@udf( + data_type=pa.int32(), + error_handling=ErrorHandlingConfig( + retry_config=UDFRetryConfig( + retry=my_custom_retry_condition, + stop=stop_after_delay(300), + wait=wait_random_exponential(min=1, max=120), + before_sleep=my_logging_callback, + ), + ), +) +def power_user_udf(x: int) -> int: + ... +``` + +Note: `on_error=` and `error_handling=` cannot be used together. + +## Restrictions + +- **Skip behavior** only works with scalar UDFs (functions that process one row at a time) +- For batch UDFs that receive `RecordBatch`, use `Retry` or `Fail` only +- **All Retry matchers must use the same backoff strategy.** You cannot mix different backoff strategies in the same `on_error` list: + +```python +@udf(on_error=[ + Retry(ConnectionError, backoff="exponential"), + Retry(TimeoutError, backoff="fixed"), # Error: different backoff! +]) + +@udf(on_error=[ + Retry(ConnectionError, backoff="fixed"), + Retry(TimeoutError, backoff="fixed"), # Same backoff - OK +]) +``` + +- **Invalid regex patterns are rejected at construction time:** + +```python +# This will raise ValueError due to the unclosed bracket +Retry(ValueError, match=r"[invalid") + +# But this will work: +Retry(ValueError, match=r"rate.?limit") +``` diff --git a/docs/geneva/udfs/index.mdx b/docs/geneva/udfs/index.mdx index 66d8805..215244d 100644 --- a/docs/geneva/udfs/index.mdx +++ b/docs/geneva/udfs/index.mdx @@ -21,17 +21,9 @@ from geneva import udf @udf def area_udf(x: int, y: int) -> int: return x * y - -@udf -def download_udf(filename:str) -> bytes: - import requests - resp = requests.get(filename) - res.raise_for_status() - return resp.content - ``` -This UDF will take the value of x and value of y from each row and return the product. The `@udf` wrapper is all that is needed. +This UDF will take the value of `x` and value of `y` from each row and return the product. The `@udf` wrapper is all that is needed. ### Batched UDFs @@ -120,7 +112,7 @@ class OpenAIEmbedding(Callable): The `udf` can have extra annotations that specify resource requirements and operational characteristics. These are just add parameters to the `udf(...)`. -### Resources requirements for UDFs +### Resource requirements for UDFs Some workers may require specific resources such as gpus, cpus and certain amounts of RAM. @@ -134,17 +126,37 @@ def func(...): ### Operational parameters for UDFs -UDFs can be quite varied -- some can be simple operations where thousands of calls can be completed per second, while others may be slow and require 30s per row. +#### checkpoint_size + +`checkpoint_size` controls how many rows are processed before checkpointing, and therefore reporting and saving progress. + +UDFs can be quite varied: some can be simple operations where thousands of calls can be completed per second, while others may be slow and require 30s per row. So a simple default like "every 1000 rows" might write once a second or once every 8 hours! + +Geneva will handle this internally, using an experimental feature that will adapt checkpoint sizing as a UDF progresses. However, if you want to see writes more or less frequently, you can set this manually. There are three parameters: -In LanceDB, the default number of rows per fragment is 1024 * 1024 rows. Conside a captioning UDF that takes 30s per row. It could take a year (!) before any results show up! (e.g. 30s/row * 1024*1024 rows/fragment => 30M s/fragment => 8.3k hours/fragment -> 347 days/fragment). To enable these to be parallelized, we provide a `batch_size` setting so the work can be split between workers and so that that partial results are checkpointed more frequently to enable finer-grained progress and job recovery. +- `checkpoint_size`: the seed for the initial checkpoint size +- `min_checkpoint_size`: the minimum value that Geneva will use while adapting checkpoint size +- `max_checkpoint_size`: the maximum value that Geneva will use while adapting checkpoint size -By default `batch_size` is 100 computed rows per checkpoint. So for an expensive captioning UDF that can take 30s per row, you may get a checkpoint every 3000s (50mins). With 100 gpus, our job could finish in 3.5 days! For cheap operations that can compute 100 rows per second you'd potentially be checkpointing every second. Tuning this can help you see progress more frequently. +Therefore, to force a checkpoint size (and effectively disable adaptive batch sizing), set all three of these parameters to the same value. + +### Error handling + +Depending on the UDF, you may want Geneva to ignore rows that hit failures, retry, or fail the entire job. For simple cases, Geneva provides a simple parameter, `on_error`, with the following options: + +| Function | Behavior | +|----------|----------| +| `retry_transient()` | Retry `ConnectionError`, `TimeoutError`, `OSError` with exponential backoff | +| `retry_all()` | Retry any exception with exponential backoff | +| `skip_on_error()` | Return `None` for any exception (skip the row) | +| `fail_fast()` | Fail immediately on any exception (default behavior) | + +If those are not specific enough, Geneva also provides [many more error handling options](/geneva/udfs/error_handling). ## Registering Features with UDFs Registering a feature is done by providing the `Table.add_columns()` function a new column name and the Geneva UDF. -Let's start by obtaining the table `tbl` ```python import geneva import numpy as np @@ -251,14 +263,13 @@ The next section shows you how to change your column definition by `alter`ing th ## Altering UDFs - You now want to revise the code. To make the change, you'd update the UDF used to compute the column using the `alter_columns` API and the updated function. The example below replaces the definition of column `area` to use the `area_udf_v2` function. ```python table.alter_columns({"path": "area", "udf": area_udf_v2} ) ``` -After making this change, the existing data already in the table does not change. However, when you perform your next basic `backfill` operation, all values would be recalculated and updated. If you only wanted some rows updated , you could perform a filtered backfill, targeting the specific rows that need the new upates. +After making this change, the existing data already in the table does not change. However, when you perform your next basic `backfill` operation, all values would be recalculated and updated. If you only wanted some rows updated, you could perform a filtered backfill, targeting the specific rows that need the new upates. For example, this filter would only update the rows where area was currently null. ```python