Skip to content

Conversation

@jackye1995
Copy link
Contributor

@jackye1995 jackye1995 commented Dec 26, 2025

So far merge_insert requires source table to have no duplicated rows, because if 2 rows match a row in target, it is ambiguous which one to merge into target.

This PR introduces 2 concepts: dedupe_by which allows user to specify a column (must be primitive type) in the source to dedupe duplicate rows, and dedupe_sort_options which allows user to specify which rows take precedence. The ordering can be increasing or decreasing, null first or last (similar to arrow SortOptions), the actual ordering of the data is based on DataFusion semantics.

When running merge insert with these 2 parameters set, during the merge phase, we decide which row to merge using the value of the dedupe column, and based on the dedupe order, decide which one to keep. If there are multiple rows of the same dedupe value, we will fail the operation since again it is ambiguous which one to merge into target.

For the actual execution, I chose to do a running dedupe during merge, rather than doing an aggregation to get the row to merge during the join phase, to avoid multiple passes against the source dataset.

The implementation also works for nested field as dedupe column, also works if the dedupe column only exists in source not in target (which is the actual main use case for merging MemTable to underlying table)

Note: this feature is for merge_insert v2 only.

@github-actions github-actions bot added enhancement New feature or request python java labels Dec 26, 2025
@github-actions
Copy link
Contributor

Code Review: feat: support merge insert with duplicated rows in source

Summary

This PR adds dedupe_by and dedupe_ordering parameters to merge_insert v2, allowing users to handle source data with duplicate keys by specifying which row to keep based on a column value.

Review

P0 - Critical Issue: Memory Growth with Large Datasets

The dedupe implementation buffers entire batches in buffered_batches: Vec<Option<RecordBatch>> to support row extraction at finalization time. While there's reference counting to clear unreferenced batches, in scenarios where many keys have candidates across many batches, memory can grow unboundedly.

For a dataset with N unique keys spread across M batches where most updates are scattered (not clustered by key), this requires O(M) batch storage. Consider:

  1. Adding documentation that warns about memory implications for large scattered updates
  2. Or implementing a spilling mechanism for extreme cases (could be a follow-up)

P1 - Potential Issue: Stable Row IDs + Dedupe Mode

In create_ordered_update_insert_stream (used when stable_row_ids = true), the code calls split_updates_and_inserts which calls process_row_action with batch_idx. However, looking at split_updates_and_inserts at line 534-600, it doesn't appear to set up batch buffering for dedupe mode before calling process_and_split_batch. The batch buffering logic is only in create_streaming_write_stream.

This could cause issues if:

  1. The dataset uses stable row IDs (triggers ordered stream path)
  2. User sets dedupe_by
  3. process_row_action expects batch_idx but buffering wasn't set up

Consider adding a test case for stable_row_ids = true with dedupe to verify this path works correctly, or ensure batch buffering is also handled in the ordered stream path.

P1 - NULL Handling Semantics

The PR mentions "NULL handling follows DataFusion semantics for sort ordering" but doesn't document or test this behavior explicitly. With ScalarValue comparisons:

  • NULLs may compare in unexpected ways
  • Users may want to know if NULL values are treated as smallest or largest

Consider adding a test case with NULL values in the dedupe column to document the expected behavior.

Minor Notes

  • Good test coverage for basic ascending/descending cases and large datasets
  • The v1 path correctly returns NotSupported error when dedupe_by is used
  • Java and Python bindings are complete and consistent

Automated review by Claude Code

@github-actions
Copy link
Contributor

Code Review Summary

This PR adds support for handling duplicate rows in source data during merge_insert operations via dedupe_by and dedupe_ordering parameters. The implementation is well-structured with comprehensive test coverage across Rust, Python, and Java.

P1: Memory Concern with Buffered Batches

Issue: In write.rs, the MergeState struct buffers entire data batches (buffered_batches: Vec<Option<RecordBatch>>) for the dedupe mode. For large merge operations, this could lead to significant memory pressure since all batches need to be held in memory until finalize_dedupe() is called.

Location: rust/lance/src/dataset/write/merge_insert/exec/write.rs:85-89 (in the diff)

Concern: While there is a reference counting mechanism to clear batches when no longer referenced, in the worst case (many distinct keys with duplicates spread across all batches), all batches will be retained. The PR does have tests with 1000 rows, but production workloads with millions of rows could hit memory limits.

Recommendation: Consider adding documentation about memory implications for very large source datasets with many duplicates, or consider if a streaming approach with external sorting could be used for extremely large inputs.

P1: Missing NULL Value Test

Issue: The documentation states "NULL handling follows DataFusion semantics for sort ordering" but there are no explicit tests verifying behavior when the dedupe column contains NULL values.

Location: rust/lance/src/dataset/write/merge_insert.rs:287-288 (in the diff)

Recommendation: Add a test case that includes NULL values in the dedupe column to verify the expected behavior (typically NULLs sort first or last depending on ordering).


Overall, the implementation is clean and well-tested for the core use cases. The API design follows the existing builder pattern nicely. The feature is properly restricted to the v2 path, with a clear error message for unsupported configurations.

@github-actions
Copy link
Contributor

Code Review

This PR adds dedupe_by and dedupe_ordering parameters to merge_insert, allowing users to handle source data with duplicate keys by keeping either the row with the smallest or largest value in a specified column.

P0/P1 Issues

P1 - Memory concern with dedupe buffering: In write.rs, the buffered_batches approach stores full data-column batches in memory, with reference counting to clear unused ones. While the ref-counting helps, for large source datasets with many unique keys, this could lead to significant memory pressure since each unique row_id keeps its "best" batch alive. Consider:

  • Adding a configurable memory limit or batch count threshold
  • Documenting the memory implications for users

P1 - Test coverage for edge cases: The tests cover basic ascending/descending behavior well, but consider adding tests for:

  • NULL values in the dedupe column (the code handles this, but no explicit test)
  • Float NaN values (mentioned in docs but not tested)
  • Empty source batches
  • Dedupe column containing ties (same value for multiple duplicates of same key)

Minor observations (not blockers)

  1. The is_schema_with_dedupe_column_only method checks schema compatibility, but the data_type() comparison might be too strict for some valid cases (e.g., nullable vs non-nullable fields). This aligns with existing behavior so likely fine.

  2. Good defensive coding with the NotSupported error for v1 path - this prevents confusion about feature availability.

  3. Test coverage is good overall with tests across Python, Java, and Rust APIs, including large dataset tests that exercise multiple batch handling.

Summary

The implementation is well-structured with clear separation of concerns (accumulation during streaming, finalization at end). The ref-counting approach for batch memory management is clever. The main consideration is documenting memory behavior for very large source datasets with high key cardinality.

@github-actions
Copy link
Contributor

Code Review

This PR adds dedupe_by and dedupe_ordering options to merge_insert, allowing users to handle duplicate rows in the source by selecting rows based on a column value. The implementation looks well-designed with good test coverage.

Key Observations

Memory Usage Concern (P1):
In write.rs, the deduplication logic buffers all input batches in buffered_batches during stream processing to enable later extraction of winning rows. While there is a reference counting mechanism that sets batches to None when no longer needed, the worst case (all rows from all batches are potential winners until the very end) could hold the entire source in memory. This could cause OOM for large source datasets with many duplicates.

Consider documenting this memory trade-off in the API documentation, or in future work, implementing a more memory-efficient approach (e.g., streaming dedup using a sort + aggregate approach).

The batch clearing mechanism looks correct - batches are set to None when their ref count hits zero, which happens when a better row is found in a different batch.

Minor Notes

  • The feature is correctly restricted to merge_insert v2 path only (scalar index path returns NotSupported error) - this is validated by test_dedupe_v1_path_unsupported.
  • Test coverage is comprehensive: ascending/descending order, large datasets, nested fields, source-only dedupe columns, NULL handling semantics.
  • The NULL handling follows DataFusion/SQL semantics correctly: NULL always loses to non-NULL values.

Overall this is a well-implemented feature with good tests.

@github-actions
Copy link
Contributor

Code Review Summary

This PR adds dedupe_by and dedupe_ordering parameters to merge_insert, allowing users to handle duplicate source rows by specifying which row should "win" based on a column value.

Overall Assessment

The implementation is well-designed with comprehensive test coverage across Rust, Python, and Java bindings. The deduplication logic follows DataFusion/SQL NULL semantics correctly.

P0/P1 Issues

P1: Memory concern in dedupe mode

In MergeState::buffer_data_batch (write.rs), the implementation buffers entire batches in memory even though only a subset of rows from each batch may ultimately be needed. While there is reference counting to clear unreferenced batches, for workloads with high cardinality (many unique keys across duplicates), most batches will remain fully buffered until finalization.

Consider:

  • The finalize_dedupe function extracts rows individually per batch - this is good
  • However, during streaming, all batches that contain at least one "winning" row so far are kept in memory
  • For very large source datasets with many duplicate keys, this could lead to significant memory pressure

This is acceptable for the initial implementation but worth documenting as a known limitation.

P1: Edge case - dedupe column not found at runtime

In MergeState::get_dedupe_column, if the dedupe column is not found, it returns None. This causes process_row_action to fall back to the non-dedupe path which will error on duplicates. Consider adding validation in try_build() that the dedupe column exists in the source schema.

Minor Observations (not blocking)

  1. The is_better_value function correctly implements NULL-loses semantics
  2. Test coverage is comprehensive including edge cases (nested fields, source-only columns, large datasets)
  3. The API is well-documented with examples in Python and Java

Tests

Good test coverage including:

  • Ascending/descending ordering
  • Nested field paths for deduplication
  • Dedupe column only in source
  • Large dataset test to verify batch handling

@github-actions
Copy link
Contributor

Code Review

This PR adds a useful feature to handle duplicate rows in merge_insert source data by specifying a dedupe_by column and ordering. The implementation is thorough with good test coverage across Rust, Python, and Java.

P1 Issues

1. Memory concern with unbounded buffering in dedupe mode

In exec/write.rs, the MergeState buffers all data batches when in dedupe mode (buffered_batches: Vec<RecordBatch>). For large source datasets with many duplicates, this could lead to significant memory consumption since batches are held until all rows matching a target row_id have been processed.

Consider:

  • Adding a memory limit or warning in documentation about this behavior
  • Or implementing a streaming approach that flushes when memory pressure is detected

2. Potential performance issue with scalar comparisons

The is_better_value function uses ScalarValue comparisons which involves overhead per-row. For high-cardinality deduplication scenarios, this could be a bottleneck. For a future optimization, consider using Arrow's native comparison kernels on arrays when possible.

Suggestions (non-blocking)

  1. The column_by_qualified_name usage for nested column support is a nice addition. Consider adding a test case specifically for nested struct fields as the dedupe column (e.g., metrics.timestamp).

  2. The test test_merge_insert_dedupe_large with 1000 rows is good, but consider adding a stress test with larger data (e.g., 100k+ rows) to validate memory behavior under CI if feasible.

Overall, the implementation is solid with comprehensive tests covering ascending/descending ordering, NULL handling, and the case where dedupe column exists only in source.

@codecov
Copy link

codecov bot commented Dec 26, 2025

Codecov Report

❌ Patch coverage is 94.61467% with 58 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...lance/src/dataset/write/merge_insert/exec/write.rs 79.72% 32 Missing and 13 partials ⚠️
rust/lance/src/dataset/write/merge_insert.rs 98.47% 9 Missing and 4 partials ⚠️

📢 Thoughts on this report? Let us know!

@github-actions
Copy link
Contributor

Code Review

This PR adds deduplication support for merge_insert when source data contains duplicate rows matching the same target row. The implementation uses a "running dedupe" approach during the merge phase rather than pre-aggregation, which is memory-efficient for streaming workloads.

Overall Assessment

The implementation is well-structured with good test coverage across Python, Java, and Rust. The design choice to dedupe during merge rather than during join is appropriate for avoiding multiple passes.

Issues

P1: Memory Management in Dedupe Mode

The finalize_dedupe function in write.rs calls std::mem::take(&mut self.accumulated_updates) and then iterates, but if the output schema projection fails mid-way or there's an early return, the buffered batches remain in memory. Consider ensuring cleanup happens even on error paths:

// In finalize_dedupe around line 2339
for (batch_idx, rows) in batch_rows {
    let buffered_batch = self.buffered_batches[batch_idx].as_ref().ok_or_else(|| {
        // This error path leaves other buffered_batches allocated

P1: Potential Panic with Empty Batches

In get_dedupe_column, if the column exists but the batch has zero rows, subsequent ScalarValue::try_from_array(dedupe_col, row_idx) could fail unexpectedly. While this might be guarded upstream, defensive checks would be safer.

Minor Observations (not blocking)

  1. The nulls_first semantics could be more explicitly documented in user-facing APIs - the current behavior (NULL loses by default) may surprise users expecting SQL-style NULL handling.

  2. The error message "Cannot deduplicate: multiple rows have the same value" is helpful, but including the key column values (similar to the existing duplicate row error) would improve debuggability.

  3. Python's dedupe_ordering() accepts strings ("ascending"/"descending") while Java uses SortOptions object - consider whether the APIs should be more consistent.

Tests

Good coverage including:

  • Ascending/descending ordering
  • NULL value handling
  • Equal value failure cases
  • Large dataset (multiple batches)
  • Nested column support

The tests appropriately verify both the happy path and error conditions.

@github-actions
Copy link
Contributor

Code Review Summary

This PR adds support for handling duplicate rows in source data during merge insert operations by introducing dedupe_by and dedupe_sort_options parameters. The implementation allows users to specify a column for deduplication and control the ordering semantics.

P0/P1 Issues

1. Memory accumulation in dedupe mode (P1 - Performance)

The dedupe implementation buffers all batches in buffered_batches during streaming (exec/write.rs:464-535). While ref-counting allows clearing unreferenced batches, for scenarios with many unique row_ids and large batches, this could lead to significant memory pressure. The existing code comment acknowledges a similar issue: "Regrettably, this needs to have unbounded capacity, and so we need to fully read the new data into memory."

Consider documenting the memory implications for users or adding a warning when dedupe mode is used with very large datasets.

2. Potential panic in finalize_dedupe (P1 - Bug)

In finalize_dedupe (exec/write.rs:284-355), if a buffered batch was cleared unexpectedly:

let buffered_batch = self.buffered_batches[batch_idx].as_ref().ok_or_else(|| ...)?;

This returns an error, but the code assumes ref-counting prevents this. If there's a logic error in ref-count management, this could cause unexpected failures during finalization. The error message is clear, but consider adding a debug assertion to catch ref-count mismatches earlier.

3. Missing test for edge case: equal dedupe values across batches (P1 - Test coverage)

The tests cover ascending/descending ordering and large datasets, but there's no explicit test for the error case when two rows across different batches have equal dedupe values. While is_better_value handles this with an error, a test would ensure the error message is propagated correctly.

Other Observations

  • The Python API exposes dedupe_ordering("ascending") which internally calls dedupe_sort_options - this is a clean abstraction for users.
  • The Java API is well-structured with clear documentation.
  • The is_better_value comparison logic correctly handles NULL values and the various sort options.
  • Test coverage for the happy path is good (Python tests: ascending, descending, large dataset; Java tests: ascending, descending).

@github-actions
Copy link
Contributor

Code Review: feat: support merge insert with duplicated rows in source

Summary

This PR adds dedupe functionality to merge_insert v2, allowing users to specify a column and sort order to resolve duplicate source rows matching the same target row. The implementation uses a running dedupe strategy during merge to avoid multiple passes.

P0/P1 Issues

P1: Memory concern with batch buffering

The dedupe implementation buffers entire batches in memory via buffered_batches: Vec<Option<RecordBatch>>. While there's reference counting to clear unused batches, for large source datasets with many duplicate keys spread across batches, memory usage could grow significantly. The existing ReplayExec::Unbounded already buffers input, so this adds a second layer of buffering.

Consider documenting this memory trade-off for users, or adding a threshold check that falls back to error when buffered data exceeds a limit.

P1: Error message clarity for equal dedupe values

When two rows have equal dedupe values (line ~2178):

"Cannot deduplicate: multiple rows have the same value ({}) in the dedupe column"

This error could be confusing since users might expect a tie-breaking behavior. Consider:

  1. Documenting this limitation clearly in the API docs
  2. Adding the key values to the error message to help users identify the problematic rows

Minor Observations (not blocking)

  1. Test coverage is comprehensive - Tests cover ascending/descending, nested fields, source-only columns, large datasets, v1 path rejection, and column-not-found scenarios.

  2. Schema validation is good - The validate_dedupe_column function properly handles nested field paths.

  3. Python API design is clean - The dedupe_by() / dedupe_ordering() builder pattern is intuitive.

  4. The decision to only support dedupe in v2 path is reasonable given the complexity of running deduplication.


Reviewed with focus on P0/P1 issues per repository guidelines

@github-actions
Copy link
Contributor

Code Review Summary

This PR adds dedupe functionality to merge_insert operations, allowing users to handle source data with duplicate keys by specifying a column to use for deduplication and sort options to control which row to keep.

Overall Assessment

The implementation is well-structured with comprehensive test coverage across Rust, Python, and Java bindings. The API design using SortOptions aligns well with Arrow conventions.

P0/P1 Issues

P0 - Python API inconsistency with Rust/Java:

In python/python/lance/dataset.py (lines 280-304), the Python API uses dedupe_sort_options(descending: bool, nulls_first: bool), but the docstring examples reference a non-existent dedupe_ordering method:

# In docstring examples:
builder.dedupe_by("timestamp").dedupe_ordering("descending")  # Wrong method name
builder.dedupe_by("version").dedupe_ordering("ascending")     # Wrong method name

The docstring should be updated to use dedupe_sort_options(descending=True) instead.

P1 - Missing test coverage for nulls_first behavior:

While there are extensive tests for ascending/descending deduplication, there are no tests that verify the nulls_first=True behavior. Consider adding a test case that verifies NULL values win when nulls_first=True.

Minor Suggestions (Non-blocking)

  1. The Python stub file (lance/__init__.pyi) signature could include the return type annotation for consistency.

  2. Consider adding an example in the Rust SortOptions docstring showing the nulls_first=true use case for completeness.


🤖 Review generated by Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant