From 13fac77163235697f6161448f01404d45d913785 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Mon, 29 Dec 2025 17:38:29 -0500 Subject: [PATCH 1/5] Refactor --- docs/src/rest.yaml | 12 + java/lance-jni/src/merge_insert.rs | 1 + .../org/lance/merge/MergeInsertParams.java | 19 ++ .../test/java/org/lance/MergeInsertTest.java | 18 ++ python/python/lance/dataset.py | 10 + python/src/dataset.rs | 5 + rust/lance/src/dataset/write/merge_insert.rs | 248 +++++++++++++++++- .../write/merge_insert/assign_action.rs | 3 + .../dataset/write/merge_insert/exec/write.rs | 1 + .../write/merge_insert/logical_plan.rs | 23 +- 10 files changed, 331 insertions(+), 9 deletions(-) diff --git a/docs/src/rest.yaml b/docs/src/rest.yaml index faf6eaeaed8..b3af38ba7ef 100644 --- a/docs/src/rest.yaml +++ b/docs/src/rest.yaml @@ -737,6 +737,13 @@ paths: required: true schema: type: string + - name: "when_matched_delete" + in: query + description: Delete all rows in target table where a match exists in source table + required: false + schema: + type: boolean + default: false - name: "when_matched_update_all" in: query description: Update all columns when rows match @@ -786,6 +793,7 @@ paths: It passes in the `MergeInsertIntoTableRequest` information in the following way: - `id`: pass through path parameter of the same name - `on`: pass through query parameter of the same name + - `when_matched_delete`: pass through query parameter of the same name - `when_matched_update_all`: pass through query parameter of the same name - `when_matched_update_all_filt`: pass through query parameter of the same name - `when_not_matched_insert_all`: pass through query parameter of the same name @@ -1938,6 +1946,10 @@ components: "on": description: Column name to use for matching rows (required) type: string + when_matched_delete: + description: Delete all rows in target table where a match exists in source table + type: boolean + default: false when_matched_update_all: description: Update all columns when rows match type: boolean diff --git a/java/lance-jni/src/merge_insert.rs b/java/lance-jni/src/merge_insert.rs index 4e74393dc54..dca9e163ff8 100644 --- a/java/lance-jni/src/merge_insert.rs +++ b/java/lance-jni/src/merge_insert.rs @@ -114,6 +114,7 @@ fn extract_when_matched<'local>(env: &mut JNIEnv<'local>, jparam: &JObject) -> R None => Err(Error::input_error("No matched updated expr".to_string())), }, "Fail" => Ok(WhenMatched::Fail), + "Delete" => Ok(WhenMatched::Delete), _ => Err(Error::input_error(format!( "Illegal when_matched: {when_matched}", ))), diff --git a/java/src/main/java/org/lance/merge/MergeInsertParams.java b/java/src/main/java/org/lance/merge/MergeInsertParams.java index e27b0f7f235..a1759455248 100644 --- a/java/src/main/java/org/lance/merge/MergeInsertParams.java +++ b/java/src/main/java/org/lance/merge/MergeInsertParams.java @@ -66,6 +66,19 @@ public MergeInsertParams withMatchedDoNothing() { return this; } + /** + * Specify that when a row in the source table matches a row in the target table, the row in the + * target table is deleted. + * + *

This can be used to achieve "when matched delete" behavior. + * + * @return This MergeInsertParams instance + */ + public MergeInsertParams withMatchedDelete() { + this.whenMatched = WhenMatched.Delete; + return this; + } + /** * Specify that when a row in the source table matches a row in the target table and the * expression evaluates to true, the row in the target table is updated by the matched row from @@ -303,6 +316,12 @@ public enum WhenMatched { * used to ensure that no existing rows are overwritten or modified after inserted. */ Fail, + + /** + * The row is deleted from the target table when a row in the source table matches a row in the + * target table. + */ + Delete } public enum WhenNotMatched { diff --git a/java/src/test/java/org/lance/MergeInsertTest.java b/java/src/test/java/org/lance/MergeInsertTest.java index 825fd73e814..c36ec26b4fa 100644 --- a/java/src/test/java/org/lance/MergeInsertTest.java +++ b/java/src/test/java/org/lance/MergeInsertTest.java @@ -219,6 +219,24 @@ public void testWhenMatchedFailWithoutMatches() throws Exception { } } + @Test + public void testWhenMatchedDelete() throws Exception { + // Test delete matched target rows if expression is true + + try (VectorSchemaRoot source = buildSource(testDataset.getSchema(), allocator)) { + try (ArrowArrayStream sourceStream = convertToStream(source, allocator)) { + MergeInsertResult result = + dataset.mergeInsert( + new MergeInsertParams(Collections.singletonList("id")) + .withMatchedDelete() + .withNotMatched(MergeInsertParams.WhenNotMatched.DoNothing), + sourceStream); + + Assertions.assertEquals("{3=Person 3, 4=Person 4}", readAll(result.dataset()).toString()); + } + } + } + private VectorSchemaRoot buildSource(Schema schema, RootAllocator allocator) { List sourceIds = Arrays.asList(0, 1, 2, 7, 8, 9); diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7666eca293d..e49e1237fab 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -163,6 +163,16 @@ def when_matched_update_all( """ return super(MergeInsertBuilder, self).when_matched_update_all(condition) + def when_matched_delete(self) -> "MergeInsertBuilder": + """ + Configure the operation to delete matched rows in the target table. + + After this method is called, when the merge insert operation executes, + any rows that match both the source table and the target table will be + deleted. + """ + return super(MergeInsertBuilder, self).when_matched_delete() + def when_matched_fail(self) -> "MergeInsertBuilder": """ Configure the operation to fail if any rows match diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 335078e5c2a..9183e8ef9f7 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -186,6 +186,11 @@ impl MergeInsertBuilder { Ok(slf) } + pub fn when_matched_delete(mut slf: PyRefMut) -> PyResult> { + slf.builder.when_matched(WhenMatched::Delete); + Ok(slf) + } + pub fn when_not_matched_insert_all(mut slf: PyRefMut) -> PyResult> { slf.builder.when_not_matched(WhenNotMatched::InsertAll); Ok(slf) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index c4709624bc7..cdb9d5c0fc9 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors //! The merge insert operation merges a batch of new data into an existing batch of old data. This can be -//! used to implement a bulk update-or-insert (upsert) or find-or-create operation. It can also be used to +//! used to implement a bulk update-or-insert (upsert), bulk delete or find-or-create operation. It can also be used to //! replace a specified region of data with new data (e.g. replace the data for the month of January) //! //! The terminology for this operation can be slightly confusing. We try and stick with the terminology from @@ -10,9 +10,9 @@ //! being inserted into the dataset. //! //! In order for this operation to work we need to be able to match rows from the source table with rows in the -//! target table. For example, given a row we need to know if this is a brand new row or matches an existing row. +//! target table. For example, given a row we need to know if this is a brand-new row or matches an existing row. //! -//! This match condition is currently limited to an key-match. This means we consider a row to be a match if the +//! This match condition is currently limited to a key-match. This means we consider a row to be a match if the //! key columns are identical in both the source and the target. This means that you will need some kind of //! meaningful key column to be able to perform a merge insert. @@ -257,6 +257,11 @@ pub enum WhenMatched { /// /// This can be used to ensure that no existing rows are overwritten or modified after inserted. Fail, + /// The matching row is deleted from the target table + /// + /// This can be used for bulk deletion by matching on key columns. + /// Unlike UpdateAll, no new row is inserted - the matched row is simply removed. + Delete, } impl WhenMatched { @@ -1437,11 +1442,31 @@ impl MergeInsertJob { let has_scalar_index = self.join_key_as_scalar_index().await?.is_some(); + // Check if this is a delete-only operation (no update/insert writes needed from source) + // For delete-only, we don't need the full source schema, just key columns for matching + let is_delete_only = matches!( + self.params.when_matched, + WhenMatched::Delete | WhenMatched::DoNothing + ) && !self.params.insert_not_matched; + + // For delete-only, verify source has all key columns + let source_has_key_columns = self.params.on.iter().all(|key| { + source_schema + .fields() + .iter() + .any(|f| f.name() == key.as_str()) + }); + let schema_ok = is_full_schema || (is_delete_only && source_has_key_columns); + Ok(matches!( self.params.when_matched, - WhenMatched::UpdateAll | WhenMatched::UpdateIf(_) | WhenMatched::Fail + WhenMatched::UpdateAll + | WhenMatched::UpdateIf(_) + | WhenMatched::Fail + | WhenMatched::Delete + | WhenMatched::DoNothing ) && (!self.params.use_index || !has_scalar_index) - && is_full_schema + && schema_ok && matches!( self.params.delete_not_matched_by_source, WhenNotMatchedBySource::Keep @@ -5314,4 +5339,217 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n assert_eq!(result, expected); } + + /// Test WhenMatched::Delete with full schema source data. + /// Source contains all columns (key, value, filterme) but we only use it to identify + /// rows to delete - no data is written back. + #[rstest::rstest] + #[tokio::test] + async fn test_when_matched_delete_full_schema( + #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion, + #[values(true, false)] enable_stable_row_ids: bool, + ) { + let schema = create_test_schema(); + let test_uri = "memory://test_delete_full.lance"; + + // Create dataset with keys 1-6 (value=1) + let ds = create_test_dataset(test_uri, version, enable_stable_row_ids).await; + + // Source data has keys 4, 5, 6, 7, 8, 9 with full schema + // Keys 4, 5, 6 match existing rows and should be deleted + // Keys 7, 8, 9 don't match (and we're not inserting) + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![4, 5, 6, 7, 8, 9])), + Arc::new(UInt32Array::from(vec![2, 2, 2, 2, 2, 2])), + Arc::new(StringArray::from(vec!["A", "B", "C", "A", "B", "C"])), + ], + ) + .unwrap(); + + let keys = vec!["key".to_string()]; + + // WhenMatched::Delete - delete matched rows, don't insert unmatched + let job = MergeInsertBuilder::try_new(ds.clone(), keys) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + + let new_reader = Box::new(RecordBatchIterator::new([Ok(new_batch)], schema.clone())); + let new_stream = reader_to_stream(new_reader); + + let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap(); + + // Should have deleted 3 rows (keys 4, 5, 6) + assert_eq!(merge_stats.num_deleted_rows, 3); + assert_eq!(merge_stats.num_inserted_rows, 0); + assert_eq!(merge_stats.num_updated_rows, 0); + + // Verify remaining data - only keys 1, 2, 3 should remain + let batches = merged_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let merged = concat_batches(&schema, &batches).unwrap(); + let mut remaining_keys: Vec = merged + .column(0) + .as_primitive::() + .values() + .to_vec(); + remaining_keys.sort(); + assert_eq!(remaining_keys, vec![1, 2, 3]); + } + + /// Test WhenMatched::Delete with ID-only source data (just key column). + /// This is the optimized bulk delete case where we only need key columns for matching. + #[rstest::rstest] + #[tokio::test] + async fn test_when_matched_delete_id_only( + #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion, + #[values(true, false)] enable_stable_row_ids: bool, + ) { + let test_uri = "memory://test_delete_id_only.lance"; + + // Create dataset with keys 1-6 (full schema: key, value, filterme) + let ds = create_test_dataset(test_uri, version, enable_stable_row_ids).await; + + // Source data has ONLY key column - this is the bulk delete optimization case + let id_only_schema = Arc::new(Schema::new(vec![Field::new("key", DataType::UInt32, true)])); + let new_batch = RecordBatch::try_new( + id_only_schema.clone(), + vec![Arc::new(UInt32Array::from(vec![2, 4, 6]))], // Delete keys 2, 4, 6 + ) + .unwrap(); + + let keys = vec!["key".to_string()]; + + // WhenMatched::Delete with ID-only source - should only need key columns for matching + let job = MergeInsertBuilder::try_new(ds.clone(), keys) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + + let new_reader = Box::new(RecordBatchIterator::new( + [Ok(new_batch)], + id_only_schema.clone(), + )); + let new_stream = reader_to_stream(new_reader); + + let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap(); + + // Should have deleted 3 rows (keys 2, 4, 6) + assert_eq!(merge_stats.num_deleted_rows, 3); + assert_eq!(merge_stats.num_inserted_rows, 0); + assert_eq!(merge_stats.num_updated_rows, 0); + + // Verify remaining data - only keys 1, 3, 5 should remain + let full_schema = create_test_schema(); + let batches = merged_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let merged = concat_batches(&full_schema, &batches).unwrap(); + let mut remaining_keys: Vec = merged + .column(0) + .as_primitive::() + .values() + .to_vec(); + remaining_keys.sort(); + assert_eq!(remaining_keys, vec![1, 3, 5]); + } + + /// Test WhenMatched::Delete combined with WhenNotMatched::InsertAll. + /// This replaces existing matching rows with nothing (delete) while inserting new rows. + #[rstest::rstest] + #[tokio::test] + async fn test_when_matched_delete_with_insert( + #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion, + ) { + let schema = create_test_schema(); + let test_uri = "memory://test_delete_with_insert.lance"; + + // Create dataset with keys 1-6 + let ds = create_test_dataset(test_uri, version, false).await; + + // Source has keys 4, 5, 6 (match - will be deleted) and 7, 8, 9 (new - will be inserted) + let new_batch = create_new_batch(schema.clone()); + + let keys = vec!["key".to_string()]; + + // Delete matched rows, insert unmatched rows + let job = MergeInsertBuilder::try_new(ds.clone(), keys) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let new_reader = Box::new(RecordBatchIterator::new([Ok(new_batch)], schema.clone())); + let new_stream = reader_to_stream(new_reader); + + let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap(); + + // Deleted 3 (keys 4, 5, 6), inserted 3 (keys 7, 8, 9) + assert_eq!(merge_stats.num_deleted_rows, 3); + assert_eq!(merge_stats.num_inserted_rows, 3); + assert_eq!(merge_stats.num_updated_rows, 0); + + // Verify: keys 1, 2, 3 (original, not matched), 7, 8, 9 (new inserts) + let batches = merged_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let merged = concat_batches(&schema, &batches).unwrap(); + let mut remaining_keys: Vec = merged + .column(0) + .as_primitive::() + .values() + .to_vec(); + remaining_keys.sort(); + assert_eq!(remaining_keys, vec![1, 2, 3, 7, 8, 9]); + + // Verify values: keys 1, 2, 3 have value=1 (original), keys 7, 8, 9 have value=2 (new) + let keyvals: Vec<(u32, u32)> = merged + .column(0) + .as_primitive::() + .values() + .iter() + .zip( + merged + .column(1) + .as_primitive::() + .values() + .iter(), + ) + .map(|(&k, &v)| (k, v)) + .collect(); + + for (key, value) in keyvals { + if key <= 3 { + assert_eq!(value, 1, "Original keys should have value=1"); + } else { + assert_eq!(value, 2, "New keys should have value=2"); + } + } + } } diff --git a/rust/lance/src/dataset/write/merge_insert/assign_action.rs b/rust/lance/src/dataset/write/merge_insert/assign_action.rs index 94ab73dd4bd..1b9edadad6b 100644 --- a/rust/lance/src/dataset/write/merge_insert/assign_action.rs +++ b/rust/lance/src/dataset/write/merge_insert/assign_action.rs @@ -125,6 +125,9 @@ pub fn merge_insert_action( WhenMatched::Fail => { cases.push((matched, Action::Fail.as_literal_expr())); } + WhenMatched::Delete => { + cases.push((matched, Action::Delete.as_literal_expr())); + } } match ¶ms.delete_not_matched_by_source { diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 6644fefaf68..b1338c5f144 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -701,6 +701,7 @@ impl DisplayAs for FullSchemaMergeInsertExec { format!("UpdateIf({})", condition) } crate::dataset::WhenMatched::Fail => "Fail".to_string(), + crate::dataset::WhenMatched::Delete => "Delete".to_string(), }; let when_not_matched = if self.params.insert_not_matched { "InsertAll" diff --git a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs index 40ce12d3b42..6c896fe3a84 100644 --- a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs +++ b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs @@ -99,6 +99,7 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode { crate::dataset::WhenMatched::UpdateAll => "UpdateAll", crate::dataset::WhenMatched::UpdateIf(_) => "UpdateIf", crate::dataset::WhenMatched::Fail => "Fail", + crate::dataset::WhenMatched::Delete => "Delete", }; let when_not_matched = if self.params.insert_not_matched { "InsertAll" @@ -145,19 +146,33 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode { fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option>> { // Going to need: - // * all columns from the `source` relation + // * all columns from the `source` relation (or just key columns for delete-only) // * `__action` column (unqualified) // * `target._rowaddr` column specifically let input_schema = self.input.schema(); let mut necessary_columns = Vec::new(); + // Check if this is a delete-only operation (no writes needed) + // In delete-only mode, we only need the key columns from source for matching + let is_delete_only = matches!( + self.params.when_matched, + crate::dataset::WhenMatched::Delete | crate::dataset::WhenMatched::DoNothing + ) && !self.params.insert_not_matched; + for (i, (qualifier, field)) in input_schema.iter().enumerate() { let should_include = match qualifier { - // Include all source columns - they contain the new data to write - Some(qualifier) if qualifier.table() == "source" => true, + // For delete-only: only include source KEY columns (for matching) + // For other ops: include all source columns - they contain the new data to write + Some(qualifier) if qualifier.table() == "source" => { + if is_delete_only { + self.params.on.iter().any(|k| k == field.name()) + } else { + true + } + } - // Include target._rowaddr specifically - needed to locate existing rows for updates + // Include target._rowaddr specifically - needed to locate existing rows for updates/deletes Some(qualifier) if qualifier.table() == "target" && field.name() == ROW_ADDR => { true } From d647e6a2cdf06b3a0d69146bedebee2168fd31bf Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 30 Dec 2025 02:03:44 -0500 Subject: [PATCH 2/5] Comments --- python/python/tests/test_dataset.py | 81 ++++++++++ rust/lance/src/dataset/write/merge_insert.rs | 146 +++++++++++++++++- .../write/merge_insert/logical_plan.rs | 6 +- 3 files changed, 225 insertions(+), 8 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 02d9d814bae..f319dec8796 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2311,6 +2311,87 @@ def test_merge_insert_when_matched_fail(tmp_path: Path): assert unchanged_data == expected +def test_merge_insert_when_matched_delete(tmp_path: Path): + """Test when_matched_delete functionality for merge insert.""" + # Create initial dataset with ids 1-6 + data = pa.table({"id": [1, 2, 3, 4, 5, 6], "val": [10, 20, 30, 40, 50, 60]}) + ds = lance.write_dataset(data, tmp_path / "dataset") + version = ds.version + + # Test 1: Basic when_matched_delete - delete matched rows only + # Source has ids 4, 5, 6 (match) and 7, 8, 9 (no match) + # Only matched rows should be deleted, unmatched rows are ignored + delete_keys = pa.table({"id": [4, 5, 6, 7, 8, 9], "val": [0, 0, 0, 0, 0, 0]}) + result = ds.merge_insert("id").when_matched_delete().execute(delete_keys) + + assert result["num_deleted_rows"] == 3 + assert result["num_inserted_rows"] == 0 + assert result["num_updated_rows"] == 0 + + # Verify only ids 1, 2, 3 remain + remaining = ds.to_table().sort_by("id") + expected = pa.table({"id": [1, 2, 3], "val": [10, 20, 30]}) + assert remaining == expected + + # Test 2: when_matched_delete with ID-only source + # Source contains only the key column + ds = lance.dataset(tmp_path / "dataset", version=version) + ds.restore() + + id_only_source = pa.table({"id": [2, 4, 6]}) # Delete even ids + result = ds.merge_insert("id").when_matched_delete().execute(id_only_source) + + assert result["num_deleted_rows"] == 3 + assert result["num_inserted_rows"] == 0 + assert result["num_updated_rows"] == 0 + + # Verify only odd ids remain + remaining = ds.to_table().sort_by("id") + expected = pa.table({"id": [1, 3, 5], "val": [10, 30, 50]}) + assert remaining == expected + + # Test 3: when_matched_delete combined with when_not_matched_insert_all + # Delete existing rows that match, insert new rows that don't match + ds = lance.dataset(tmp_path / "dataset", version=version) + ds.restore() + + new_data = pa.table( + {"id": [4, 5, 6, 7, 8, 9], "val": [400, 500, 600, 700, 800, 900]} + ) + result = ( + ds.merge_insert("id") + .when_matched_delete() + .when_not_matched_insert_all() + .execute(new_data) + ) + + # Should delete 3 (ids 4, 5, 6) and insert 3 (ids 7, 8, 9) + assert result["num_deleted_rows"] == 3 + assert result["num_inserted_rows"] == 3 + assert result["num_updated_rows"] == 0 + + # Verify: ids 1, 2, 3 (original), 7, 8, 9 (new inserts) + remaining = ds.to_table().sort_by("id") + expected = pa.table({"id": [1, 2, 3, 7, 8, 9], "val": [10, 20, 30, 700, 800, 900]}) + assert remaining == expected + + # Test 4: when_matched_delete with no matches (should be a no-op delete) + ds = lance.dataset(tmp_path / "dataset", version=version) + ds.restore() + + non_matching = pa.table({"id": [100, 200, 300], "val": [0, 0, 0]}) + result = ds.merge_insert("id").when_matched_delete().execute(non_matching) + + assert result["num_deleted_rows"] == 0 + assert result["num_inserted_rows"] == 0 + assert result["num_updated_rows"] == 0 + + # Data should be unchanged + remaining = ds.to_table().sort_by("id") + expected = pa.table({"id": [1, 2, 3, 4, 5, 6], "val": [10, 20, 30, 40, 50, 60]}) + assert remaining == expected + + def test_merge_insert_large(): # Doing subcolumns update with merge insert triggers this error. # Data needs to be large enough to make DataFusion create multiple batches diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index cdb9d5c0fc9..667045513df 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1444,7 +1444,7 @@ impl MergeInsertJob { // Check if this is a delete-only operation (no update/insert writes needed from source) // For delete-only, we don't need the full source schema, just key columns for matching - let is_delete_only = matches!( + let no_upsert = matches!( self.params.when_matched, WhenMatched::Delete | WhenMatched::DoNothing ) && !self.params.insert_not_matched; @@ -1456,7 +1456,7 @@ impl MergeInsertJob { .iter() .any(|f| f.name() == key.as_str()) }); - let schema_ok = is_full_schema || (is_delete_only && source_has_key_columns); + let schema_ok = is_full_schema || (no_upsert && source_has_key_columns); Ok(matches!( self.params.when_matched, @@ -1464,7 +1464,6 @@ impl MergeInsertJob { | WhenMatched::UpdateIf(_) | WhenMatched::Fail | WhenMatched::Delete - | WhenMatched::DoNothing ) && (!self.params.use_index || !has_scalar_index) && schema_ok && matches!( @@ -5370,6 +5369,26 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n let keys = vec!["key".to_string()]; + // First, verify the execution plan structure + // Delete-only should use Inner join and only include key columns (optimization) + // Action 3 = Delete + let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone()) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new( + [Ok(new_batch.clone())], + schema.clone(), + ))); + let plan = plan_job.create_plan(plan_stream).await.unwrap(); + // Verify key structural elements: MergeInsert config, Delete action (3), key-only projection + assert_plan_node_equals( + plan, + "MergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing, when_not_matched_by_source=Keep...CASE WHEN key@2 IS NOT NULL AND _rowaddr@1 IS NOT NULL THEN 3 ELSE 0 END as __action]...StreamingTableExec: partition_sizes=1, projection=[key]" + ).await.unwrap(); + // WhenMatched::Delete - delete matched rows, don't insert unmatched let job = MergeInsertBuilder::try_new(ds.clone(), keys) .unwrap() @@ -5420,8 +5439,6 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n // Create dataset with keys 1-6 (full schema: key, value, filterme) let ds = create_test_dataset(test_uri, version, enable_stable_row_ids).await; - - // Source data has ONLY key column - this is the bulk delete optimization case let id_only_schema = Arc::new(Schema::new(vec![Field::new("key", DataType::UInt32, true)])); let new_batch = RecordBatch::try_new( id_only_schema.clone(), @@ -5431,6 +5448,24 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n let keys = vec!["key".to_string()]; + // ID-only delete should use Inner join with key-only projection + // on=[(key@0, key@0)] because key is at position 0 in both target and source + let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone()) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new( + [Ok(new_batch.clone())], + id_only_schema.clone(), + ))); + let plan = plan_job.create_plan(plan_stream).await.unwrap(); + assert_plan_node_equals( + plan, + "MergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing, when_not_matched_by_source=Keep...CASE WHEN key@2 IS NOT NULL AND _rowaddr@1 IS NOT NULL THEN 3 ELSE 0 END as __action]...StreamingTableExec: partition_sizes=1, projection=[key]" + ).await.unwrap(); + // WhenMatched::Delete with ID-only source - should only need key columns for matching let job = MergeInsertBuilder::try_new(ds.clone(), keys) .unwrap() @@ -5491,6 +5526,23 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n let keys = vec!["key".to_string()]; + // Delete + Insert should use Right join to see unmatched rows for insertion + let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone()) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new( + [Ok(new_batch.clone())], + schema.clone(), + ))); + let plan = plan_job.create_plan(plan_stream).await.unwrap(); + assert_plan_node_equals( + plan, + "MergeInsert: on=[key], when_matched=Delete, when_not_matched=InsertAll, when_not_matched_by_source=Keep...THEN 2 WHEN...THEN 3 ELSE 0 END as __action]...projection=[key, value, filterme]" + ).await.unwrap(); + // Delete matched rows, insert unmatched rows let job = MergeInsertBuilder::try_new(ds.clone(), keys) .unwrap() @@ -5552,4 +5604,88 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n } } } + + /// Test WhenMatched::Delete when source data has no matching keys. + /// This should result in zero deletes and the dataset remains unchanged. + #[rstest::rstest] + #[tokio::test] + async fn test_when_matched_delete_no_matches( + #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion, + ) { + let schema = create_test_schema(); + let test_uri = "memory://test_delete_no_matches.lance"; + + // Create dataset with keys 1-6 + let ds = create_test_dataset(test_uri, version, false).await; + + // Source data has keys 100, 200, 300 - none match existing keys 1-6 + let non_matching_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100, 200, 300])), + Arc::new(UInt32Array::from(vec![10, 20, 30])), + Arc::new(StringArray::from(vec!["X", "Y", "Z"])), + ], + ) + .unwrap(); + + let keys = vec!["key".to_string()]; + + // Even with no matches, the plan structure should be the same + let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone()) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new( + [Ok(non_matching_batch.clone())], + schema.clone(), + ))); + let plan = plan_job.create_plan(plan_stream).await.unwrap(); + assert_plan_node_equals( + plan, + "MergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing, when_not_matched_by_source=Keep...CASE WHEN key@2 IS NOT NULL AND _rowaddr@1 IS NOT NULL THEN 3 ELSE 0 END as __action]...StreamingTableExec: partition_sizes=1, projection=[key]" + ).await.unwrap(); + + // Execute the delete operation + let job = MergeInsertBuilder::try_new(ds.clone(), keys) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + + let new_reader = Box::new(RecordBatchIterator::new( + [Ok(non_matching_batch)], + schema.clone(), + )); + let new_stream = reader_to_stream(new_reader); + + let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap(); + + // Should have deleted 0 rows since no keys matched + assert_eq!(merge_stats.num_deleted_rows, 0); + assert_eq!(merge_stats.num_inserted_rows, 0); + assert_eq!(merge_stats.num_updated_rows, 0); + + // Verify all original data remains unchanged - keys 1-6 should all still be present + let batches = merged_dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let merged = concat_batches(&schema, &batches).unwrap(); + let mut remaining_keys: Vec = merged + .column(0) + .as_primitive::() + .values() + .to_vec(); + remaining_keys.sort(); + assert_eq!(remaining_keys, vec![1, 2, 3, 4, 5, 6]); + } } diff --git a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs index 6c896fe3a84..aaf4d74d960 100644 --- a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs +++ b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs @@ -155,9 +155,9 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode { // Check if this is a delete-only operation (no writes needed) // In delete-only mode, we only need the key columns from source for matching - let is_delete_only = matches!( + let no_upsert = matches!( self.params.when_matched, - crate::dataset::WhenMatched::Delete | crate::dataset::WhenMatched::DoNothing + crate::dataset::WhenMatched::Delete ) && !self.params.insert_not_matched; for (i, (qualifier, field)) in input_schema.iter().enumerate() { @@ -165,7 +165,7 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode { // For delete-only: only include source KEY columns (for matching) // For other ops: include all source columns - they contain the new data to write Some(qualifier) if qualifier.table() == "source" => { - if is_delete_only { + if no_upsert { self.params.on.iter().any(|k| k == field.name()) } else { true From 03c9ff8d76a58d7abe0429108c7a24b9c0d7afde Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 31 Dec 2025 15:23:40 -0800 Subject: [PATCH 3/5] use separated DeleteOnlyMergeInsertExec --- Cargo.toml | 2 +- rust/lance/src/dataset/write/merge_insert.rs | 81 +++-- .../src/dataset/write/merge_insert/exec.rs | 56 ++++ .../dataset/write/merge_insert/exec/delete.rs | 315 ++++++++++++++++++ .../dataset/write/merge_insert/exec/write.rs | 60 +--- .../write/merge_insert/logical_plan.rs | 44 ++- 6 files changed, 469 insertions(+), 89 deletions(-) create mode 100644 rust/lance/src/dataset/write/merge_insert/exec/delete.rs diff --git a/Cargo.toml b/Cargo.toml index 38a6b1264f0..635f7ad862a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ lance-io = { version = "=2.0.0-beta.4", path = "./rust/lance-io", default-featur lance-linalg = { version = "=2.0.0-beta.4", path = "./rust/lance-linalg" } lance-namespace = { version = "=2.0.0-beta.4", path = "./rust/lance-namespace" } lance-namespace-impls = { version = "=2.0.0-beta.4", path = "./rust/lance-namespace-impls" } -lance-namespace-reqwest-client = "0.4.0" +lance-namespace-reqwest-client = "=0.4.0" lance-table = { version = "=2.0.0-beta.4", path = "./rust/lance-table" } lance-test-macros = { version = "=2.0.0-beta.4", path = "./rust/lance-test-macros" } lance-testing = { version = "=2.0.0-beta.4", path = "./rust/lance-testing" } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 667045513df..a368575460c 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1390,29 +1390,40 @@ impl MergeInsertJob { } // Extract merge stats from the execution plan - let merge_insert_exec = plan - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::Internal { - message: "Expected FullSchemaMergeInsertExec".into(), + let (stats, transaction, affected_rows) = if let Some(full_exec) = + plan.as_any() + .downcast_ref::() + { + let stats = full_exec.merge_stats().ok_or_else(|| Error::Internal { + message: "Merge stats not available - execution may not have completed".into(), location: location!(), })?; - - let stats = merge_insert_exec - .merge_stats() - .ok_or_else(|| Error::Internal { + let transaction = full_exec.transaction().ok_or_else(|| Error::Internal { + message: "Transaction not available - execution may not have completed".into(), + location: location!(), + })?; + let affected_rows = full_exec.affected_rows().map(RowAddrTreeMap::from); + (stats, transaction, affected_rows) + } else if let Some(delete_exec) = plan + .as_any() + .downcast_ref::() + { + let stats = delete_exec.merge_stats().ok_or_else(|| Error::Internal { message: "Merge stats not available - execution may not have completed".into(), location: location!(), })?; - - let transaction = merge_insert_exec - .transaction() - .ok_or_else(|| Error::Internal { + let transaction = delete_exec.transaction().ok_or_else(|| Error::Internal { message: "Transaction not available - execution may not have completed".into(), location: location!(), })?; - - let affected_rows = merge_insert_exec.affected_rows().map(RowAddrTreeMap::from); + let affected_rows = delete_exec.affected_rows().map(RowAddrTreeMap::from); + (stats, transaction, affected_rows) + } else { + return Err(Error::Internal { + message: "Expected FullSchemaMergeInsertExec or DeleteOnlyMergeInsertExec".into(), + location: location!(), + }); + }; Ok((transaction, stats, affected_rows)) } @@ -5383,13 +5394,17 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n schema.clone(), ))); let plan = plan_job.create_plan(plan_stream).await.unwrap(); - // Verify key structural elements: MergeInsert config, Delete action (3), key-only projection assert_plan_node_equals( plan, - "MergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing, when_not_matched_by_source=Keep...CASE WHEN key@2 IS NOT NULL AND _rowaddr@1 IS NOT NULL THEN 3 ELSE 0 END as __action]...StreamingTableExec: partition_sizes=1, projection=[key]" - ).await.unwrap(); - - // WhenMatched::Delete - delete matched rows, don't insert unmatched + "DeleteOnlyMergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing + ... + HashJoinExec: ...join_type=Inner... + ... + ... + StreamingTableExec: partition_sizes=1, projection=[key]", + ) + .await + .unwrap(); let job = MergeInsertBuilder::try_new(ds.clone(), keys) .unwrap() .when_matched(WhenMatched::Delete) @@ -5463,10 +5478,15 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n let plan = plan_job.create_plan(plan_stream).await.unwrap(); assert_plan_node_equals( plan, - "MergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing, when_not_matched_by_source=Keep...CASE WHEN key@2 IS NOT NULL AND _rowaddr@1 IS NOT NULL THEN 3 ELSE 0 END as __action]...StreamingTableExec: partition_sizes=1, projection=[key]" - ).await.unwrap(); - - // WhenMatched::Delete with ID-only source - should only need key columns for matching + "DeleteOnlyMergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing + ... + HashJoinExec: ...join_type=Inner... + ... + ... + StreamingTableExec: partition_sizes=1, projection=[key]", + ) + .await + .unwrap(); let job = MergeInsertBuilder::try_new(ds.clone(), keys) .unwrap() .when_matched(WhenMatched::Delete) @@ -5645,10 +5665,15 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n let plan = plan_job.create_plan(plan_stream).await.unwrap(); assert_plan_node_equals( plan, - "MergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing, when_not_matched_by_source=Keep...CASE WHEN key@2 IS NOT NULL AND _rowaddr@1 IS NOT NULL THEN 3 ELSE 0 END as __action]...StreamingTableExec: partition_sizes=1, projection=[key]" - ).await.unwrap(); - - // Execute the delete operation + "DeleteOnlyMergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing + ... + HashJoinExec: ...join_type=Inner... + ... + ... + StreamingTableExec: partition_sizes=1, projection=[key]", + ) + .await + .unwrap(); let job = MergeInsertBuilder::try_new(ds.clone(), keys) .unwrap() .when_matched(WhenMatched::Delete) diff --git a/rust/lance/src/dataset/write/merge_insert/exec.rs b/rust/lance/src/dataset/write/merge_insert/exec.rs index 79648424e34..de39d0a1610 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec.rs @@ -1,12 +1,22 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +mod delete; mod write; +use std::collections::BTreeMap; +use std::sync::Arc; + use datafusion::physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; +use futures::StreamExt; +use lance_table::format::Fragment; +use roaring::RoaringTreemap; + +pub use delete::DeleteOnlyMergeInsertExec; pub use write::FullSchemaMergeInsertExec; use super::MergeStats; +use crate::Dataset; pub(super) struct MergeInsertMetrics { pub num_inserted_rows: Count, @@ -45,3 +55,49 @@ impl MergeInsertMetrics { } } } + +pub(super) async fn apply_deletions( + dataset: &Dataset, + removed_row_addrs: &RoaringTreemap, +) -> crate::Result<(Vec, Vec)> { + let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::>()); + + enum FragmentChange { + Unchanged, + Modified(Box), + Removed(u64), + } + + let mut updated_fragments = Vec::new(); + let mut removed_fragments = Vec::new(); + + let mut stream = futures::stream::iter(dataset.get_fragments()) + .map(move |fragment| { + let bitmaps_ref = bitmaps.clone(); + async move { + let fragment_id = fragment.id(); + if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) { + match fragment.extend_deletions(*bitmap).await { + Ok(Some(new_fragment)) => { + Ok(FragmentChange::Modified(Box::new(new_fragment.metadata))) + } + Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)), + Err(e) => Err(e), + } + } else { + Ok(FragmentChange::Unchanged) + } + } + }) + .buffer_unordered(dataset.object_store.io_parallelism()); + + while let Some(res) = stream.next().await.transpose()? { + match res { + FragmentChange::Unchanged => {} + FragmentChange::Modified(fragment) => updated_fragments.push(*fragment), + FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id), + } + } + + Ok((updated_fragments, removed_fragments)) +} diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs new file mode 100644 index 00000000000..2368643373c --- /dev/null +++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs @@ -0,0 +1,315 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::{Arc, Mutex}; + +use arrow_array::{Array, RecordBatch, UInt64Array, UInt8Array}; +use datafusion::common::Result as DFResult; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::{ + execution::{SendableRecordBatchStream, TaskContext}, + physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + stream::RecordBatchStreamAdapter, + DisplayAs, ExecutionPlan, PlanProperties, + }, +}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use futures::StreamExt; +use lance_core::ROW_ADDR; +use roaring::RoaringTreemap; + +use crate::dataset::transaction::{Operation, Transaction}; +use crate::dataset::write::merge_insert::assign_action::Action; +use crate::dataset::write::merge_insert::{MergeInsertParams, MergeStats, MERGE_ACTION_COLUMN}; +use crate::Dataset; + +use super::{apply_deletions, MergeInsertMetrics}; + +/// Specialized physical execution node for delete-only merge insert operations. +/// +/// This is an optimized path for when `WhenMatched::Delete` is used without inserts. +/// Unlike `FullSchemaMergeInsertExec`, this node: +/// - Only reads `_rowaddr` and `__action` columns (no data columns needed) +/// - Skips the write step entirely (no new fragments created) +/// - Only applies deletions to existing fragments +/// +/// This is significantly more efficient for bulk delete operations where +/// we only need to identify matching rows and mark them as deleted. +#[derive(Debug)] +pub struct DeleteOnlyMergeInsertExec { + input: Arc, + dataset: Arc, + params: MergeInsertParams, + properties: PlanProperties, + metrics: ExecutionPlanMetricsSet, + merge_stats: Arc>>, + transaction: Arc>>, + affected_rows: Arc>>, +} + +impl DeleteOnlyMergeInsertExec { + pub fn try_new( + input: Arc, + dataset: Arc, + params: MergeInsertParams, + ) -> DFResult { + let empty_schema = Arc::new(arrow_schema::Schema::empty()); + let properties = PlanProperties::new( + EquivalenceProperties::new(empty_schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + ); + + Ok(Self { + input, + dataset, + params, + properties, + metrics: ExecutionPlanMetricsSet::new(), + merge_stats: Arc::new(Mutex::new(None)), + transaction: Arc::new(Mutex::new(None)), + affected_rows: Arc::new(Mutex::new(None)), + }) + } + + /// Returns the merge statistics if the execution has completed. + pub fn merge_stats(&self) -> Option { + self.merge_stats.lock().ok().and_then(|guard| guard.clone()) + } + + /// Returns the transaction if the execution has completed. + pub fn transaction(&self) -> Option { + self.transaction.lock().ok().and_then(|guard| guard.clone()) + } + + /// Returns the affected rows (deleted row addresses) if the execution has completed. + pub fn affected_rows(&self) -> Option { + self.affected_rows + .lock() + .ok() + .and_then(|guard| guard.clone()) + } + + async fn collect_deletions( + mut input_stream: SendableRecordBatchStream, + metrics: MergeInsertMetrics, + ) -> DFResult { + let schema = input_stream.schema(); + + let (rowaddr_idx, _) = schema.column_with_name(ROW_ADDR).ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Expected _rowaddr column in delete-only merge insert input".to_string(), + ) + })?; + + let (action_idx, _) = schema + .column_with_name(MERGE_ACTION_COLUMN) + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "Expected {} column in delete-only merge insert input", + MERGE_ACTION_COLUMN + )) + })?; + + let mut delete_row_addrs = RoaringTreemap::new(); + + while let Some(batch_result) = input_stream.next().await { + let batch = batch_result?; + + let row_addr_array = batch + .column(rowaddr_idx) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Expected UInt64Array for _rowaddr column".to_string(), + ) + })?; + + let action_array = batch + .column(action_idx) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "Expected UInt8Array for {} column", + MERGE_ACTION_COLUMN + )) + })?; + + for row_idx in 0..batch.num_rows() { + let action_code = action_array.value(row_idx); + let action = Action::try_from(action_code).map_err(|e| { + datafusion::error::DataFusionError::Internal(format!( + "Invalid action code {}: {}", + action_code, e + )) + })?; + + if action == Action::Delete && !row_addr_array.is_null(row_idx) { + let row_addr = row_addr_array.value(row_idx); + delete_row_addrs.insert(row_addr); + metrics.num_deleted_rows.add(1); + } + } + } + + Ok(delete_row_addrs) + } +} + +impl DisplayAs for DeleteOnlyMergeInsertExec { + fn fmt_as( + &self, + t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + datafusion::physical_plan::DisplayFormatType::Default + | datafusion::physical_plan::DisplayFormatType::Verbose => { + let on_keys = self.params.on.join(", "); + write!( + f, + "DeleteOnlyMergeInsert: on=[{}], when_matched=Delete, when_not_matched=DoNothing", + on_keys + ) + } + datafusion::physical_plan::DisplayFormatType::TreeRender => { + write!(f, "DeleteOnlyMergeInsert[{}]", self.dataset.uri()) + } + } + } +} + +impl ExecutionPlan for DeleteOnlyMergeInsertExec { + fn name(&self) -> &str { + "DeleteOnlyMergeInsertExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + Arc::new(arrow_schema::Schema::empty()) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(datafusion::error::DataFusionError::Internal( + "DeleteOnlyMergeInsertExec requires exactly one child".to_string(), + )); + } + Ok(Arc::new(Self { + input: children[0].clone(), + dataset: self.dataset.clone(), + params: self.params.clone(), + properties: self.properties.clone(), + metrics: self.metrics.clone(), + merge_stats: self.merge_stats.clone(), + transaction: self.transaction.clone(), + affected_rows: self.affected_rows.clone(), + })) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn supports_limit_pushdown(&self) -> bool { + false + } + + fn required_input_distribution(&self) -> Vec { + vec![datafusion_physical_expr::Distribution::SinglePartition] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let _baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let metrics = MergeInsertMetrics::new(&self.metrics, partition); + let input_stream = self.input.execute(partition, context)?; + + let dataset = self.dataset.clone(); + let merge_stats_holder = self.merge_stats.clone(); + let transaction_holder = self.transaction.clone(); + let affected_rows_holder = self.affected_rows.clone(); + let mem_wal_to_merge = self.params.mem_wal_to_merge.clone(); + + let result_stream = futures::stream::once(async move { + let delete_row_addrs = Self::collect_deletions(input_stream, metrics).await?; + + let (updated_fragments, removed_fragment_ids) = + apply_deletions(&dataset, &delete_row_addrs) + .await + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + + let operation = Operation::Update { + removed_fragment_ids, + updated_fragments, + new_fragments: vec![], + fields_modified: vec![], + mem_wal_to_merge, + fields_for_preserving_frag_bitmap: dataset + .schema() + .fields + .iter() + .map(|f| f.id as u32) + .collect(), + update_mode: None, + }; + + let transaction = Transaction::new(dataset.manifest.version, operation, None); + + let num_deleted = delete_row_addrs.len(); + let stats = MergeStats { + num_deleted_rows: num_deleted, + num_inserted_rows: 0, + num_updated_rows: 0, + bytes_written: 0, + num_files_written: 0, + num_attempts: 1, + }; + + if let Ok(mut transaction_guard) = transaction_holder.lock() { + transaction_guard.replace(transaction); + } + if let Ok(mut merge_stats_guard) = merge_stats_holder.lock() { + merge_stats_guard.replace(stats); + } + if let Ok(mut affected_rows_guard) = affected_rows_holder.lock() { + affected_rows_guard.replace(delete_row_addrs); + } + + let empty_schema = Arc::new(arrow_schema::Schema::empty()); + let empty_batch = RecordBatch::new_empty(empty_schema); + Ok(empty_batch) + }); + + let empty_schema = Arc::new(arrow_schema::Schema::empty()); + Ok(Box::pin(RecordBatchStreamAdapter::new( + empty_schema, + result_stream, + ))) + } +} diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index b1338c5f144..2c65d814d13 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -19,7 +19,10 @@ use datafusion::{ }; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::{stream, StreamExt}; +use lance_core::{Error, ROW_ADDR, ROW_ID}; +use lance_table::format::RowIdMeta; use roaring::RoaringTreemap; +use snafu::location; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::dataset::utils::CapturedRowIds; @@ -37,12 +40,10 @@ use crate::{ write_fragments_internal, WriteParams, }, }, - Dataset, Result, + Dataset, }; -use lance_core::{Error, ROW_ADDR, ROW_ID}; -use lance_table::format::{Fragment, RowIdMeta}; -use snafu::location; -use std::collections::BTreeMap; + +use super::apply_deletions; /// Shared state for merge insert operations to simplify lock management struct MergeState { @@ -484,53 +485,6 @@ impl FullSchemaMergeInsertExec { (total_bytes as usize, total_files) } - /// Delete a batch of rows by row address, returns the fragments modified and the fragments removed - async fn apply_deletions( - dataset: &Dataset, - removed_row_addrs: &RoaringTreemap, - ) -> Result<(Vec, Vec)> { - let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::>()); - - enum FragmentChange { - Unchanged, - Modified(Box), - Removed(u64), - } - - let mut updated_fragments = Vec::new(); - let mut removed_fragments = Vec::new(); - - let mut stream = futures::stream::iter(dataset.get_fragments()) - .map(move |fragment| { - let bitmaps_ref = bitmaps.clone(); - async move { - let fragment_id = fragment.id(); - if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) { - match fragment.extend_deletions(*bitmap).await { - Ok(Some(new_fragment)) => { - Ok(FragmentChange::Modified(Box::new(new_fragment.metadata))) - } - Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)), - Err(e) => Err(e), - } - } else { - Ok(FragmentChange::Unchanged) - } - } - }) - .buffer_unordered(dataset.object_store.io_parallelism()); - - while let Some(res) = stream.next().await.transpose()? { - match res { - FragmentChange::Unchanged => {} - FragmentChange::Modified(fragment) => updated_fragments.push(*fragment), - FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id), - } - } - - Ok((updated_fragments, removed_fragments)) - } - fn split_updates_and_inserts( &self, input_stream: SendableRecordBatchStream, @@ -874,7 +828,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let delete_row_addrs_clone = merge_state.delete_row_addrs; let (updated_fragments, removed_fragment_ids) = - Self::apply_deletions(&dataset, &delete_row_addrs_clone).await?; + apply_deletions(&dataset, &delete_row_addrs_clone).await?; // Step 4: Create the transaction operation let operation = Operation::Update { diff --git a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs index aaf4d74d960..f25c7ef11c3 100644 --- a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs +++ b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs @@ -13,7 +13,11 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNod use lance_core::{ROW_ADDR, ROW_ID}; use std::{cmp::Ordering, sync::Arc}; -use crate::{dataset::write::merge_insert::exec::FullSchemaMergeInsertExec, Dataset}; +use crate::dataset::write::merge_insert::exec::{ + DeleteOnlyMergeInsertExec, FullSchemaMergeInsertExec, +}; +use crate::dataset::{WhenMatched, WhenNotMatchedBySource}; +use crate::Dataset; use super::{MergeInsertParams, MERGE_ACTION_COLUMN}; @@ -199,6 +203,23 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode { /// Physical planner for MergeInsertWriteNode. pub struct MergeInsertPlanner {} +impl MergeInsertPlanner { + /// Check if this is a delete-only operation that can use the optimized path. + /// + /// Delete-only operations are when: + /// - `when_matched` is `Delete` + /// - `insert_not_matched` is `false` (no inserts) + /// - `delete_not_matched_by_source` is `Keep` (no additional deletes of unmatched target rows) + fn is_delete_only(params: &MergeInsertParams) -> bool { + matches!(params.when_matched, WhenMatched::Delete) + && !params.insert_not_matched + && matches!( + params.delete_not_matched_by_source, + WhenNotMatchedBySource::Keep + ) + } +} + #[async_trait] impl ExtensionPlanner for MergeInsertPlanner { async fn plan_extension( @@ -213,12 +234,21 @@ impl ExtensionPlanner for MergeInsertPlanner { if let Some(write_node) = node.as_any().downcast_ref::() { assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs"); assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs"); - let exec = FullSchemaMergeInsertExec::try_new( - physical_inputs[0].clone(), - write_node.dataset.clone(), - write_node.params.clone(), - )?; - Some(Arc::new(exec)) + + let exec: Arc = if Self::is_delete_only(&write_node.params) { + Arc::new(DeleteOnlyMergeInsertExec::try_new( + physical_inputs[0].clone(), + write_node.dataset.clone(), + write_node.params.clone(), + )?) + } else { + Arc::new(FullSchemaMergeInsertExec::try_new( + physical_inputs[0].clone(), + write_node.dataset.clone(), + write_node.params.clone(), + )?) + }; + Some(exec) } else { None }, From 04fb14a854104c3d9fd1df9ff15a19cf43620dd9 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Mon, 5 Jan 2026 14:00:09 -0800 Subject: [PATCH 4/5] Add more tests --- rust/lance/src/dataset/write/merge_insert.rs | 139 +++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index a368575460c..b6ec4112171 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -5713,4 +5713,143 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n remaining_keys.sort(); assert_eq!(remaining_keys, vec![1, 2, 3, 4, 5, 6]); } + + /// Test that MergeInsertPlanner::is_delete_only correctly identifies delete-only operations. + /// + /// Delete-only is true only when: + /// - when_matched = Delete + /// - insert_not_matched = false (WhenNotMatched::DoNothing) + /// - delete_not_matched_by_source = Keep + /// + /// This test iterates through all valid combinations of WhenMatched, WhenNotMatched, + /// and WhenNotMatchedBySource to verify the is_delete_only logic. + #[tokio::test] + async fn test_is_delete_only() { + use itertools::iproduct; + + // All variants to test (excluding UpdateIf and DeleteIf because they require expressions) + let when_matched_variants = [ + WhenMatched::UpdateAll, + WhenMatched::DoNothing, + WhenMatched::Fail, + WhenMatched::Delete, + ]; + let when_not_matched_variants = [WhenNotMatched::InsertAll, WhenNotMatched::DoNothing]; + let when_not_matched_by_source_variants = + [WhenNotMatchedBySource::Keep, WhenNotMatchedBySource::Delete]; + + let schema = create_test_schema(); + + for (idx, (when_matched, when_not_matched, when_not_matched_by_source)) in iproduct!( + when_matched_variants.iter().cloned(), + when_not_matched_variants.iter().cloned(), + when_not_matched_by_source_variants.iter().cloned() + ) + .enumerate() + { + // Check if this is a valid (non-no-op) combination, since this would fail try_build() + let is_no_op = matches!(when_matched, WhenMatched::DoNothing | WhenMatched::Fail) + && matches!(when_not_matched, WhenNotMatched::DoNothing) + && matches!(when_not_matched_by_source, WhenNotMatchedBySource::Keep); + if is_no_op { + continue; + } + + let test_uri = format!("memory://test_is_delete_only_{}.lance", idx); + let ds = create_test_dataset(&test_uri, LanceFileVersion::V2_0, false).await; + + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![4, 5, 6])), + Arc::new(UInt32Array::from(vec![2, 2, 2])), + Arc::new(StringArray::from(vec!["A", "B", "C"])), + ], + ) + .unwrap(); + + let keys = vec!["key".to_string()]; + + let mut builder = MergeInsertBuilder::try_new(ds.clone(), keys).unwrap(); + builder + .when_matched(when_matched.clone()) + .when_not_matched(when_not_matched.clone()) + .when_not_matched_by_source(when_not_matched_by_source.clone()); + + let job = builder.try_build().unwrap(); + + let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new( + [Ok(new_batch)], + schema.clone(), + ))); + let plan = job.create_plan(plan_stream).await.unwrap(); + + let plan_str = datafusion::physical_plan::displayable(plan.as_ref()) + .indent(true) + .to_string(); + + let expected_delete_only = matches!(when_matched, WhenMatched::Delete) + && matches!(when_not_matched, WhenNotMatched::DoNothing) + && matches!(when_not_matched_by_source, WhenNotMatchedBySource::Keep); + + if expected_delete_only { + assert!( + plan_str.contains("DeleteOnlyMergeInsert"), + "Expected DeleteOnlyMergeInsert for ({:?}, {:?}, {:?}), but got:\n{}", + when_matched, + when_not_matched, + when_not_matched_by_source, + plan_str + ); + } else { + assert!( + plan_str.contains("MergeInsert:") + && !plan_str.contains("DeleteOnlyMergeInsert"), + "Expected MergeInsert (not DeleteOnlyMergeInsert) for ({:?}, {:?}, {:?}), but got:\n{}", + when_matched, + when_not_matched, + when_not_matched_by_source, + plan_str + ); + } + } + } + + /// Tests that apply_deletions correctly handles an error when applying the row deletions. + #[tokio::test] + async fn test_apply_deletions_invalid_row_address() { + use super::exec::apply_deletions; + use roaring::RoaringTreemap; + + let test_uri = "memory://test_apply_deletions_error.lance"; + + // Create a dataset with 2 fragments, each with 3 rows + let ds = create_test_dataset(test_uri, LanceFileVersion::V2_0, false).await; + let fragment_id = ds.get_fragments()[0].id() as u32; + + // Create row addresses with invalid row offsets for this fragment + // Row address format: high 32 bits = fragment_id, low 32 bits = row_offset + // Each fragment has only 3 rows (offsets 0, 1, 2). + // + // The error in extend_deletions is triggered when deletion_vector.len() >= physical_rows + // AND at least one row ID is >= physical_rows. + // So we need to add enough deletions (at least 3) with some being invalid (>= 3). + let mut invalid_row_addrs = RoaringTreemap::new(); + let base = (fragment_id as u64) << 32; + // Add 4 deletions: rows 10, 11, 12, 13 (all invalid since only rows 0-2 exist) + for row_offset in 10..14u64 { + invalid_row_addrs.insert(base | row_offset); + } + + let result = apply_deletions(&ds, &invalid_row_addrs).await; + + assert!(result.is_err(), "Expected error for invalid row addresses"); + let err = result.unwrap_err(); + assert!( + err.to_string() + .contains("Deletion vector includes rows that aren't in the fragment"), + "Expected 'rows that aren't in the fragment' error, got: {}", + err + ); + } } From cc2f3e5d9c7c74abe53cef397136ca0360eae813 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Mon, 5 Jan 2026 14:27:21 -0800 Subject: [PATCH 5/5] use take to avoid clone --- .../dataset/write/merge_insert/exec/delete.rs | 18 ++++++++++++------ .../dataset/write/merge_insert/exec/write.rs | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs index 2368643373c..12ae2359bab 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs @@ -74,22 +74,28 @@ impl DeleteOnlyMergeInsertExec { }) } - /// Returns the merge statistics if the execution has completed. + /// Takes the merge statistics if the execution has completed. pub fn merge_stats(&self) -> Option { - self.merge_stats.lock().ok().and_then(|guard| guard.clone()) + self.merge_stats + .lock() + .ok() + .and_then(|mut guard| guard.take()) } - /// Returns the transaction if the execution has completed. + /// Takes the transaction if the execution has completed. pub fn transaction(&self) -> Option { - self.transaction.lock().ok().and_then(|guard| guard.clone()) + self.transaction + .lock() + .ok() + .and_then(|mut guard| guard.take()) } - /// Returns the affected rows (deleted row addresses) if the execution has completed. + /// Takes the affected rows (deleted row addresses) if the execution has completed. pub fn affected_rows(&self) -> Option { self.affected_rows .lock() .ok() - .and_then(|guard| guard.clone()) + .and_then(|mut guard| guard.take()) } async fn collect_deletions( diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 2c65d814d13..b2cef2dc601 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -179,25 +179,31 @@ impl FullSchemaMergeInsertExec { }) } - /// Returns the merge statistics if the execution has completed. + /// Takes the merge statistics if the execution has completed. /// Returns `None` if the execution is still in progress or hasn't started. pub fn merge_stats(&self) -> Option { - self.merge_stats.lock().ok().and_then(|guard| guard.clone()) + self.merge_stats + .lock() + .ok() + .and_then(|mut guard| guard.take()) } - /// Returns the transaction if the execution has completed. + /// Takes the transaction if the execution has completed. /// Returns `None` if the execution is still in progress or hasn't started. pub fn transaction(&self) -> Option { - self.transaction.lock().ok().and_then(|guard| guard.clone()) + self.transaction + .lock() + .ok() + .and_then(|mut guard| guard.take()) } - /// Returns the affected rows (deleted/updated row addresses) if the execution has completed. + /// Takes the affected rows (deleted/updated row addresses) if the execution has completed. /// Returns `None` if the execution is still in progress or hasn't started. pub fn affected_rows(&self) -> Option { self.affected_rows .lock() .ok() - .and_then(|guard| guard.clone()) + .and_then(|mut guard| guard.take()) } /// Creates a filtered stream that captures row addresses for deletion and returns