diff --git a/docs/src/rest.yaml b/docs/src/rest.yaml
index faf6eaeaed8..b3af38ba7ef 100644
--- a/docs/src/rest.yaml
+++ b/docs/src/rest.yaml
@@ -737,6 +737,13 @@ paths:
required: true
schema:
type: string
+ - name: "when_matched_delete"
+ in: query
+ description: Delete all rows in target table where a match exists in source table
+ required: false
+ schema:
+ type: boolean
+ default: false
- name: "when_matched_update_all"
in: query
description: Update all columns when rows match
@@ -786,6 +793,7 @@ paths:
It passes in the `MergeInsertIntoTableRequest` information in the following way:
- `id`: pass through path parameter of the same name
- `on`: pass through query parameter of the same name
+ - `when_matched_delete`: pass through query parameter of the same name
- `when_matched_update_all`: pass through query parameter of the same name
- `when_matched_update_all_filt`: pass through query parameter of the same name
- `when_not_matched_insert_all`: pass through query parameter of the same name
@@ -1938,6 +1946,10 @@ components:
"on":
description: Column name to use for matching rows (required)
type: string
+ when_matched_delete:
+ description: Delete all rows in target table where a match exists in source table
+ type: boolean
+ default: false
when_matched_update_all:
description: Update all columns when rows match
type: boolean
diff --git a/java/lance-jni/src/merge_insert.rs b/java/lance-jni/src/merge_insert.rs
index 4e74393dc54..dca9e163ff8 100644
--- a/java/lance-jni/src/merge_insert.rs
+++ b/java/lance-jni/src/merge_insert.rs
@@ -114,6 +114,7 @@ fn extract_when_matched<'local>(env: &mut JNIEnv<'local>, jparam: &JObject) -> R
None => Err(Error::input_error("No matched updated expr".to_string())),
},
"Fail" => Ok(WhenMatched::Fail),
+ "Delete" => Ok(WhenMatched::Delete),
_ => Err(Error::input_error(format!(
"Illegal when_matched: {when_matched}",
))),
diff --git a/java/src/main/java/org/lance/merge/MergeInsertParams.java b/java/src/main/java/org/lance/merge/MergeInsertParams.java
index e27b0f7f235..a1759455248 100644
--- a/java/src/main/java/org/lance/merge/MergeInsertParams.java
+++ b/java/src/main/java/org/lance/merge/MergeInsertParams.java
@@ -66,6 +66,19 @@ public MergeInsertParams withMatchedDoNothing() {
return this;
}
+ /**
+ * Specify that when a row in the source table matches a row in the target table, the row in the
+ * target table is deleted.
+ *
+ *
This can be used to achieve "when matched delete" behavior.
+ *
+ * @return This MergeInsertParams instance
+ */
+ public MergeInsertParams withMatchedDelete() {
+ this.whenMatched = WhenMatched.Delete;
+ return this;
+ }
+
/**
* Specify that when a row in the source table matches a row in the target table and the
* expression evaluates to true, the row in the target table is updated by the matched row from
@@ -303,6 +316,12 @@ public enum WhenMatched {
* used to ensure that no existing rows are overwritten or modified after inserted.
*/
Fail,
+
+ /**
+ * The row is deleted from the target table when a row in the source table matches a row in the
+ * target table.
+ */
+ Delete
}
public enum WhenNotMatched {
diff --git a/java/src/test/java/org/lance/MergeInsertTest.java b/java/src/test/java/org/lance/MergeInsertTest.java
index 825fd73e814..c36ec26b4fa 100644
--- a/java/src/test/java/org/lance/MergeInsertTest.java
+++ b/java/src/test/java/org/lance/MergeInsertTest.java
@@ -219,6 +219,24 @@ public void testWhenMatchedFailWithoutMatches() throws Exception {
}
}
+ @Test
+ public void testWhenMatchedDelete() throws Exception {
+ // Test delete matched target rows if expression is true
+
+ try (VectorSchemaRoot source = buildSource(testDataset.getSchema(), allocator)) {
+ try (ArrowArrayStream sourceStream = convertToStream(source, allocator)) {
+ MergeInsertResult result =
+ dataset.mergeInsert(
+ new MergeInsertParams(Collections.singletonList("id"))
+ .withMatchedDelete()
+ .withNotMatched(MergeInsertParams.WhenNotMatched.DoNothing),
+ sourceStream);
+
+ Assertions.assertEquals("{3=Person 3, 4=Person 4}", readAll(result.dataset()).toString());
+ }
+ }
+ }
+
private VectorSchemaRoot buildSource(Schema schema, RootAllocator allocator) {
List sourceIds = Arrays.asList(0, 1, 2, 7, 8, 9);
diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py
index 610c390aa31..1a88cf8071b 100644
--- a/python/python/lance/dataset.py
+++ b/python/python/lance/dataset.py
@@ -164,6 +164,16 @@ def when_matched_update_all(
"""
return super(MergeInsertBuilder, self).when_matched_update_all(condition)
+ def when_matched_delete(self) -> "MergeInsertBuilder":
+ """
+ Configure the operation to delete matched rows in the target table.
+
+ After this method is called, when the merge insert operation executes,
+ any rows that match both the source table and the target table will be
+ deleted.
+ """
+ return super(MergeInsertBuilder, self).when_matched_delete()
+
def when_matched_fail(self) -> "MergeInsertBuilder":
"""
Configure the operation to fail if any rows match
diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py
index 02d9d814bae..f319dec8796 100644
--- a/python/python/tests/test_dataset.py
+++ b/python/python/tests/test_dataset.py
@@ -2311,6 +2311,87 @@ def test_merge_insert_when_matched_fail(tmp_path: Path):
assert unchanged_data == expected
+def test_merge_insert_when_matched_delete(tmp_path: Path):
+ """Test when_matched_delete functionality for merge insert."""
+ # Create initial dataset with ids 1-6
+ data = pa.table({"id": [1, 2, 3, 4, 5, 6], "val": [10, 20, 30, 40, 50, 60]})
+ ds = lance.write_dataset(data, tmp_path / "dataset")
+ version = ds.version
+
+ # Test 1: Basic when_matched_delete - delete matched rows only
+ # Source has ids 4, 5, 6 (match) and 7, 8, 9 (no match)
+ # Only matched rows should be deleted, unmatched rows are ignored
+ delete_keys = pa.table({"id": [4, 5, 6, 7, 8, 9], "val": [0, 0, 0, 0, 0, 0]})
+ result = ds.merge_insert("id").when_matched_delete().execute(delete_keys)
+
+ assert result["num_deleted_rows"] == 3
+ assert result["num_inserted_rows"] == 0
+ assert result["num_updated_rows"] == 0
+
+ # Verify only ids 1, 2, 3 remain
+ remaining = ds.to_table().sort_by("id")
+ expected = pa.table({"id": [1, 2, 3], "val": [10, 20, 30]})
+ assert remaining == expected
+
+ # Test 2: when_matched_delete with ID-only source
+ # Source contains only the key column
+ ds = lance.dataset(tmp_path / "dataset", version=version)
+ ds.restore()
+
+ id_only_source = pa.table({"id": [2, 4, 6]}) # Delete even ids
+ result = ds.merge_insert("id").when_matched_delete().execute(id_only_source)
+
+ assert result["num_deleted_rows"] == 3
+ assert result["num_inserted_rows"] == 0
+ assert result["num_updated_rows"] == 0
+
+ # Verify only odd ids remain
+ remaining = ds.to_table().sort_by("id")
+ expected = pa.table({"id": [1, 3, 5], "val": [10, 30, 50]})
+ assert remaining == expected
+
+ # Test 3: when_matched_delete combined with when_not_matched_insert_all
+ # Delete existing rows that match, insert new rows that don't match
+ ds = lance.dataset(tmp_path / "dataset", version=version)
+ ds.restore()
+
+ new_data = pa.table(
+ {"id": [4, 5, 6, 7, 8, 9], "val": [400, 500, 600, 700, 800, 900]}
+ )
+ result = (
+ ds.merge_insert("id")
+ .when_matched_delete()
+ .when_not_matched_insert_all()
+ .execute(new_data)
+ )
+
+ # Should delete 3 (ids 4, 5, 6) and insert 3 (ids 7, 8, 9)
+ assert result["num_deleted_rows"] == 3
+ assert result["num_inserted_rows"] == 3
+ assert result["num_updated_rows"] == 0
+
+ # Verify: ids 1, 2, 3 (original), 7, 8, 9 (new inserts)
+ remaining = ds.to_table().sort_by("id")
+ expected = pa.table({"id": [1, 2, 3, 7, 8, 9], "val": [10, 20, 30, 700, 800, 900]})
+ assert remaining == expected
+
+ # Test 4: when_matched_delete with no matches (should be a no-op delete)
+ ds = lance.dataset(tmp_path / "dataset", version=version)
+ ds.restore()
+
+ non_matching = pa.table({"id": [100, 200, 300], "val": [0, 0, 0]})
+ result = ds.merge_insert("id").when_matched_delete().execute(non_matching)
+
+ assert result["num_deleted_rows"] == 0
+ assert result["num_inserted_rows"] == 0
+ assert result["num_updated_rows"] == 0
+
+ # Data should be unchanged
+ remaining = ds.to_table().sort_by("id")
+ expected = pa.table({"id": [1, 2, 3, 4, 5, 6], "val": [10, 20, 30, 40, 50, 60]})
+ assert remaining == expected
+
+
def test_merge_insert_large():
# Doing subcolumns update with merge insert triggers this error.
# Data needs to be large enough to make DataFusion create multiple batches
diff --git a/python/src/dataset.rs b/python/src/dataset.rs
index 99f7bc83d2c..49a6397e818 100644
--- a/python/src/dataset.rs
+++ b/python/src/dataset.rs
@@ -186,6 +186,11 @@ impl MergeInsertBuilder {
Ok(slf)
}
+ pub fn when_matched_delete(mut slf: PyRefMut) -> PyResult> {
+ slf.builder.when_matched(WhenMatched::Delete);
+ Ok(slf)
+ }
+
pub fn when_not_matched_insert_all(mut slf: PyRefMut) -> PyResult> {
slf.builder.when_not_matched(WhenNotMatched::InsertAll);
Ok(slf)
diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs
index c4709624bc7..b6ec4112171 100644
--- a/rust/lance/src/dataset/write/merge_insert.rs
+++ b/rust/lance/src/dataset/write/merge_insert.rs
@@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors
//! The merge insert operation merges a batch of new data into an existing batch of old data. This can be
-//! used to implement a bulk update-or-insert (upsert) or find-or-create operation. It can also be used to
+//! used to implement a bulk update-or-insert (upsert), bulk delete or find-or-create operation. It can also be used to
//! replace a specified region of data with new data (e.g. replace the data for the month of January)
//!
//! The terminology for this operation can be slightly confusing. We try and stick with the terminology from
@@ -10,9 +10,9 @@
//! being inserted into the dataset.
//!
//! In order for this operation to work we need to be able to match rows from the source table with rows in the
-//! target table. For example, given a row we need to know if this is a brand new row or matches an existing row.
+//! target table. For example, given a row we need to know if this is a brand-new row or matches an existing row.
//!
-//! This match condition is currently limited to an key-match. This means we consider a row to be a match if the
+//! This match condition is currently limited to a key-match. This means we consider a row to be a match if the
//! key columns are identical in both the source and the target. This means that you will need some kind of
//! meaningful key column to be able to perform a merge insert.
@@ -257,6 +257,11 @@ pub enum WhenMatched {
///
/// This can be used to ensure that no existing rows are overwritten or modified after inserted.
Fail,
+ /// The matching row is deleted from the target table
+ ///
+ /// This can be used for bulk deletion by matching on key columns.
+ /// Unlike UpdateAll, no new row is inserted - the matched row is simply removed.
+ Delete,
}
impl WhenMatched {
@@ -1385,29 +1390,40 @@ impl MergeInsertJob {
}
// Extract merge stats from the execution plan
- let merge_insert_exec = plan
- .as_any()
- .downcast_ref::()
- .ok_or_else(|| Error::Internal {
- message: "Expected FullSchemaMergeInsertExec".into(),
+ let (stats, transaction, affected_rows) = if let Some(full_exec) =
+ plan.as_any()
+ .downcast_ref::()
+ {
+ let stats = full_exec.merge_stats().ok_or_else(|| Error::Internal {
+ message: "Merge stats not available - execution may not have completed".into(),
location: location!(),
})?;
-
- let stats = merge_insert_exec
- .merge_stats()
- .ok_or_else(|| Error::Internal {
+ let transaction = full_exec.transaction().ok_or_else(|| Error::Internal {
+ message: "Transaction not available - execution may not have completed".into(),
+ location: location!(),
+ })?;
+ let affected_rows = full_exec.affected_rows().map(RowAddrTreeMap::from);
+ (stats, transaction, affected_rows)
+ } else if let Some(delete_exec) = plan
+ .as_any()
+ .downcast_ref::()
+ {
+ let stats = delete_exec.merge_stats().ok_or_else(|| Error::Internal {
message: "Merge stats not available - execution may not have completed".into(),
location: location!(),
})?;
-
- let transaction = merge_insert_exec
- .transaction()
- .ok_or_else(|| Error::Internal {
+ let transaction = delete_exec.transaction().ok_or_else(|| Error::Internal {
message: "Transaction not available - execution may not have completed".into(),
location: location!(),
})?;
-
- let affected_rows = merge_insert_exec.affected_rows().map(RowAddrTreeMap::from);
+ let affected_rows = delete_exec.affected_rows().map(RowAddrTreeMap::from);
+ (stats, transaction, affected_rows)
+ } else {
+ return Err(Error::Internal {
+ message: "Expected FullSchemaMergeInsertExec or DeleteOnlyMergeInsertExec".into(),
+ location: location!(),
+ });
+ };
Ok((transaction, stats, affected_rows))
}
@@ -1437,11 +1453,30 @@ impl MergeInsertJob {
let has_scalar_index = self.join_key_as_scalar_index().await?.is_some();
+ // Check if this is a delete-only operation (no update/insert writes needed from source)
+ // For delete-only, we don't need the full source schema, just key columns for matching
+ let no_upsert = matches!(
+ self.params.when_matched,
+ WhenMatched::Delete | WhenMatched::DoNothing
+ ) && !self.params.insert_not_matched;
+
+ // For delete-only, verify source has all key columns
+ let source_has_key_columns = self.params.on.iter().all(|key| {
+ source_schema
+ .fields()
+ .iter()
+ .any(|f| f.name() == key.as_str())
+ });
+ let schema_ok = is_full_schema || (no_upsert && source_has_key_columns);
+
Ok(matches!(
self.params.when_matched,
- WhenMatched::UpdateAll | WhenMatched::UpdateIf(_) | WhenMatched::Fail
+ WhenMatched::UpdateAll
+ | WhenMatched::UpdateIf(_)
+ | WhenMatched::Fail
+ | WhenMatched::Delete
) && (!self.params.use_index || !has_scalar_index)
- && is_full_schema
+ && schema_ok
&& matches!(
self.params.delete_not_matched_by_source,
WhenNotMatchedBySource::Keep
@@ -5314,4 +5349,507 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n
assert_eq!(result, expected);
}
+
+ /// Test WhenMatched::Delete with full schema source data.
+ /// Source contains all columns (key, value, filterme) but we only use it to identify
+ /// rows to delete - no data is written back.
+ #[rstest::rstest]
+ #[tokio::test]
+ async fn test_when_matched_delete_full_schema(
+ #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion,
+ #[values(true, false)] enable_stable_row_ids: bool,
+ ) {
+ let schema = create_test_schema();
+ let test_uri = "memory://test_delete_full.lance";
+
+ // Create dataset with keys 1-6 (value=1)
+ let ds = create_test_dataset(test_uri, version, enable_stable_row_ids).await;
+
+ // Source data has keys 4, 5, 6, 7, 8, 9 with full schema
+ // Keys 4, 5, 6 match existing rows and should be deleted
+ // Keys 7, 8, 9 don't match (and we're not inserting)
+ let new_batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(UInt32Array::from(vec![4, 5, 6, 7, 8, 9])),
+ Arc::new(UInt32Array::from(vec![2, 2, 2, 2, 2, 2])),
+ Arc::new(StringArray::from(vec!["A", "B", "C", "A", "B", "C"])),
+ ],
+ )
+ .unwrap();
+
+ let keys = vec!["key".to_string()];
+
+ // First, verify the execution plan structure
+ // Delete-only should use Inner join and only include key columns (optimization)
+ // Action 3 = Delete
+ let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone())
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::DoNothing)
+ .try_build()
+ .unwrap();
+ let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new(
+ [Ok(new_batch.clone())],
+ schema.clone(),
+ )));
+ let plan = plan_job.create_plan(plan_stream).await.unwrap();
+ assert_plan_node_equals(
+ plan,
+ "DeleteOnlyMergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing
+ ...
+ HashJoinExec: ...join_type=Inner...
+ ...
+ ...
+ StreamingTableExec: partition_sizes=1, projection=[key]",
+ )
+ .await
+ .unwrap();
+ let job = MergeInsertBuilder::try_new(ds.clone(), keys)
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::DoNothing)
+ .try_build()
+ .unwrap();
+
+ let new_reader = Box::new(RecordBatchIterator::new([Ok(new_batch)], schema.clone()));
+ let new_stream = reader_to_stream(new_reader);
+
+ let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap();
+
+ // Should have deleted 3 rows (keys 4, 5, 6)
+ assert_eq!(merge_stats.num_deleted_rows, 3);
+ assert_eq!(merge_stats.num_inserted_rows, 0);
+ assert_eq!(merge_stats.num_updated_rows, 0);
+
+ // Verify remaining data - only keys 1, 2, 3 should remain
+ let batches = merged_dataset
+ .scan()
+ .try_into_stream()
+ .await
+ .unwrap()
+ .try_collect::>()
+ .await
+ .unwrap();
+
+ let merged = concat_batches(&schema, &batches).unwrap();
+ let mut remaining_keys: Vec = merged
+ .column(0)
+ .as_primitive::()
+ .values()
+ .to_vec();
+ remaining_keys.sort();
+ assert_eq!(remaining_keys, vec![1, 2, 3]);
+ }
+
+ /// Test WhenMatched::Delete with ID-only source data (just key column).
+ /// This is the optimized bulk delete case where we only need key columns for matching.
+ #[rstest::rstest]
+ #[tokio::test]
+ async fn test_when_matched_delete_id_only(
+ #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion,
+ #[values(true, false)] enable_stable_row_ids: bool,
+ ) {
+ let test_uri = "memory://test_delete_id_only.lance";
+
+ // Create dataset with keys 1-6 (full schema: key, value, filterme)
+ let ds = create_test_dataset(test_uri, version, enable_stable_row_ids).await;
+ let id_only_schema = Arc::new(Schema::new(vec![Field::new("key", DataType::UInt32, true)]));
+ let new_batch = RecordBatch::try_new(
+ id_only_schema.clone(),
+ vec![Arc::new(UInt32Array::from(vec![2, 4, 6]))], // Delete keys 2, 4, 6
+ )
+ .unwrap();
+
+ let keys = vec!["key".to_string()];
+
+ // ID-only delete should use Inner join with key-only projection
+ // on=[(key@0, key@0)] because key is at position 0 in both target and source
+ let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone())
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::DoNothing)
+ .try_build()
+ .unwrap();
+ let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new(
+ [Ok(new_batch.clone())],
+ id_only_schema.clone(),
+ )));
+ let plan = plan_job.create_plan(plan_stream).await.unwrap();
+ assert_plan_node_equals(
+ plan,
+ "DeleteOnlyMergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing
+ ...
+ HashJoinExec: ...join_type=Inner...
+ ...
+ ...
+ StreamingTableExec: partition_sizes=1, projection=[key]",
+ )
+ .await
+ .unwrap();
+ let job = MergeInsertBuilder::try_new(ds.clone(), keys)
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::DoNothing)
+ .try_build()
+ .unwrap();
+
+ let new_reader = Box::new(RecordBatchIterator::new(
+ [Ok(new_batch)],
+ id_only_schema.clone(),
+ ));
+ let new_stream = reader_to_stream(new_reader);
+
+ let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap();
+
+ // Should have deleted 3 rows (keys 2, 4, 6)
+ assert_eq!(merge_stats.num_deleted_rows, 3);
+ assert_eq!(merge_stats.num_inserted_rows, 0);
+ assert_eq!(merge_stats.num_updated_rows, 0);
+
+ // Verify remaining data - only keys 1, 3, 5 should remain
+ let full_schema = create_test_schema();
+ let batches = merged_dataset
+ .scan()
+ .try_into_stream()
+ .await
+ .unwrap()
+ .try_collect::>()
+ .await
+ .unwrap();
+
+ let merged = concat_batches(&full_schema, &batches).unwrap();
+ let mut remaining_keys: Vec = merged
+ .column(0)
+ .as_primitive::()
+ .values()
+ .to_vec();
+ remaining_keys.sort();
+ assert_eq!(remaining_keys, vec![1, 3, 5]);
+ }
+
+ /// Test WhenMatched::Delete combined with WhenNotMatched::InsertAll.
+ /// This replaces existing matching rows with nothing (delete) while inserting new rows.
+ #[rstest::rstest]
+ #[tokio::test]
+ async fn test_when_matched_delete_with_insert(
+ #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion,
+ ) {
+ let schema = create_test_schema();
+ let test_uri = "memory://test_delete_with_insert.lance";
+
+ // Create dataset with keys 1-6
+ let ds = create_test_dataset(test_uri, version, false).await;
+
+ // Source has keys 4, 5, 6 (match - will be deleted) and 7, 8, 9 (new - will be inserted)
+ let new_batch = create_new_batch(schema.clone());
+
+ let keys = vec!["key".to_string()];
+
+ // Delete + Insert should use Right join to see unmatched rows for insertion
+ let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone())
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::InsertAll)
+ .try_build()
+ .unwrap();
+ let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new(
+ [Ok(new_batch.clone())],
+ schema.clone(),
+ )));
+ let plan = plan_job.create_plan(plan_stream).await.unwrap();
+ assert_plan_node_equals(
+ plan,
+ "MergeInsert: on=[key], when_matched=Delete, when_not_matched=InsertAll, when_not_matched_by_source=Keep...THEN 2 WHEN...THEN 3 ELSE 0 END as __action]...projection=[key, value, filterme]"
+ ).await.unwrap();
+
+ // Delete matched rows, insert unmatched rows
+ let job = MergeInsertBuilder::try_new(ds.clone(), keys)
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::InsertAll)
+ .try_build()
+ .unwrap();
+
+ let new_reader = Box::new(RecordBatchIterator::new([Ok(new_batch)], schema.clone()));
+ let new_stream = reader_to_stream(new_reader);
+
+ let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap();
+
+ // Deleted 3 (keys 4, 5, 6), inserted 3 (keys 7, 8, 9)
+ assert_eq!(merge_stats.num_deleted_rows, 3);
+ assert_eq!(merge_stats.num_inserted_rows, 3);
+ assert_eq!(merge_stats.num_updated_rows, 0);
+
+ // Verify: keys 1, 2, 3 (original, not matched), 7, 8, 9 (new inserts)
+ let batches = merged_dataset
+ .scan()
+ .try_into_stream()
+ .await
+ .unwrap()
+ .try_collect::>()
+ .await
+ .unwrap();
+
+ let merged = concat_batches(&schema, &batches).unwrap();
+ let mut remaining_keys: Vec = merged
+ .column(0)
+ .as_primitive::()
+ .values()
+ .to_vec();
+ remaining_keys.sort();
+ assert_eq!(remaining_keys, vec![1, 2, 3, 7, 8, 9]);
+
+ // Verify values: keys 1, 2, 3 have value=1 (original), keys 7, 8, 9 have value=2 (new)
+ let keyvals: Vec<(u32, u32)> = merged
+ .column(0)
+ .as_primitive::()
+ .values()
+ .iter()
+ .zip(
+ merged
+ .column(1)
+ .as_primitive::()
+ .values()
+ .iter(),
+ )
+ .map(|(&k, &v)| (k, v))
+ .collect();
+
+ for (key, value) in keyvals {
+ if key <= 3 {
+ assert_eq!(value, 1, "Original keys should have value=1");
+ } else {
+ assert_eq!(value, 2, "New keys should have value=2");
+ }
+ }
+ }
+
+ /// Test WhenMatched::Delete when source data has no matching keys.
+ /// This should result in zero deletes and the dataset remains unchanged.
+ #[rstest::rstest]
+ #[tokio::test]
+ async fn test_when_matched_delete_no_matches(
+ #[values(LanceFileVersion::Legacy, LanceFileVersion::V2_0)] version: LanceFileVersion,
+ ) {
+ let schema = create_test_schema();
+ let test_uri = "memory://test_delete_no_matches.lance";
+
+ // Create dataset with keys 1-6
+ let ds = create_test_dataset(test_uri, version, false).await;
+
+ // Source data has keys 100, 200, 300 - none match existing keys 1-6
+ let non_matching_batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(UInt32Array::from(vec![100, 200, 300])),
+ Arc::new(UInt32Array::from(vec![10, 20, 30])),
+ Arc::new(StringArray::from(vec!["X", "Y", "Z"])),
+ ],
+ )
+ .unwrap();
+
+ let keys = vec!["key".to_string()];
+
+ // Even with no matches, the plan structure should be the same
+ let plan_job = MergeInsertBuilder::try_new(ds.clone(), keys.clone())
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::DoNothing)
+ .try_build()
+ .unwrap();
+ let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new(
+ [Ok(non_matching_batch.clone())],
+ schema.clone(),
+ )));
+ let plan = plan_job.create_plan(plan_stream).await.unwrap();
+ assert_plan_node_equals(
+ plan,
+ "DeleteOnlyMergeInsert: on=[key], when_matched=Delete, when_not_matched=DoNothing
+ ...
+ HashJoinExec: ...join_type=Inner...
+ ...
+ ...
+ StreamingTableExec: partition_sizes=1, projection=[key]",
+ )
+ .await
+ .unwrap();
+ let job = MergeInsertBuilder::try_new(ds.clone(), keys)
+ .unwrap()
+ .when_matched(WhenMatched::Delete)
+ .when_not_matched(WhenNotMatched::DoNothing)
+ .try_build()
+ .unwrap();
+
+ let new_reader = Box::new(RecordBatchIterator::new(
+ [Ok(non_matching_batch)],
+ schema.clone(),
+ ));
+ let new_stream = reader_to_stream(new_reader);
+
+ let (merged_dataset, merge_stats) = job.execute(new_stream).await.unwrap();
+
+ // Should have deleted 0 rows since no keys matched
+ assert_eq!(merge_stats.num_deleted_rows, 0);
+ assert_eq!(merge_stats.num_inserted_rows, 0);
+ assert_eq!(merge_stats.num_updated_rows, 0);
+
+ // Verify all original data remains unchanged - keys 1-6 should all still be present
+ let batches = merged_dataset
+ .scan()
+ .try_into_stream()
+ .await
+ .unwrap()
+ .try_collect::>()
+ .await
+ .unwrap();
+
+ let merged = concat_batches(&schema, &batches).unwrap();
+ let mut remaining_keys: Vec = merged
+ .column(0)
+ .as_primitive::()
+ .values()
+ .to_vec();
+ remaining_keys.sort();
+ assert_eq!(remaining_keys, vec![1, 2, 3, 4, 5, 6]);
+ }
+
+ /// Test that MergeInsertPlanner::is_delete_only correctly identifies delete-only operations.
+ ///
+ /// Delete-only is true only when:
+ /// - when_matched = Delete
+ /// - insert_not_matched = false (WhenNotMatched::DoNothing)
+ /// - delete_not_matched_by_source = Keep
+ ///
+ /// This test iterates through all valid combinations of WhenMatched, WhenNotMatched,
+ /// and WhenNotMatchedBySource to verify the is_delete_only logic.
+ #[tokio::test]
+ async fn test_is_delete_only() {
+ use itertools::iproduct;
+
+ // All variants to test (excluding UpdateIf and DeleteIf because they require expressions)
+ let when_matched_variants = [
+ WhenMatched::UpdateAll,
+ WhenMatched::DoNothing,
+ WhenMatched::Fail,
+ WhenMatched::Delete,
+ ];
+ let when_not_matched_variants = [WhenNotMatched::InsertAll, WhenNotMatched::DoNothing];
+ let when_not_matched_by_source_variants =
+ [WhenNotMatchedBySource::Keep, WhenNotMatchedBySource::Delete];
+
+ let schema = create_test_schema();
+
+ for (idx, (when_matched, when_not_matched, when_not_matched_by_source)) in iproduct!(
+ when_matched_variants.iter().cloned(),
+ when_not_matched_variants.iter().cloned(),
+ when_not_matched_by_source_variants.iter().cloned()
+ )
+ .enumerate()
+ {
+ // Check if this is a valid (non-no-op) combination, since this would fail try_build()
+ let is_no_op = matches!(when_matched, WhenMatched::DoNothing | WhenMatched::Fail)
+ && matches!(when_not_matched, WhenNotMatched::DoNothing)
+ && matches!(when_not_matched_by_source, WhenNotMatchedBySource::Keep);
+ if is_no_op {
+ continue;
+ }
+
+ let test_uri = format!("memory://test_is_delete_only_{}.lance", idx);
+ let ds = create_test_dataset(&test_uri, LanceFileVersion::V2_0, false).await;
+
+ let new_batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(UInt32Array::from(vec![4, 5, 6])),
+ Arc::new(UInt32Array::from(vec![2, 2, 2])),
+ Arc::new(StringArray::from(vec!["A", "B", "C"])),
+ ],
+ )
+ .unwrap();
+
+ let keys = vec!["key".to_string()];
+
+ let mut builder = MergeInsertBuilder::try_new(ds.clone(), keys).unwrap();
+ builder
+ .when_matched(when_matched.clone())
+ .when_not_matched(when_not_matched.clone())
+ .when_not_matched_by_source(when_not_matched_by_source.clone());
+
+ let job = builder.try_build().unwrap();
+
+ let plan_stream = reader_to_stream(Box::new(RecordBatchIterator::new(
+ [Ok(new_batch)],
+ schema.clone(),
+ )));
+ let plan = job.create_plan(plan_stream).await.unwrap();
+
+ let plan_str = datafusion::physical_plan::displayable(plan.as_ref())
+ .indent(true)
+ .to_string();
+
+ let expected_delete_only = matches!(when_matched, WhenMatched::Delete)
+ && matches!(when_not_matched, WhenNotMatched::DoNothing)
+ && matches!(when_not_matched_by_source, WhenNotMatchedBySource::Keep);
+
+ if expected_delete_only {
+ assert!(
+ plan_str.contains("DeleteOnlyMergeInsert"),
+ "Expected DeleteOnlyMergeInsert for ({:?}, {:?}, {:?}), but got:\n{}",
+ when_matched,
+ when_not_matched,
+ when_not_matched_by_source,
+ plan_str
+ );
+ } else {
+ assert!(
+ plan_str.contains("MergeInsert:")
+ && !plan_str.contains("DeleteOnlyMergeInsert"),
+ "Expected MergeInsert (not DeleteOnlyMergeInsert) for ({:?}, {:?}, {:?}), but got:\n{}",
+ when_matched,
+ when_not_matched,
+ when_not_matched_by_source,
+ plan_str
+ );
+ }
+ }
+ }
+
+ /// Tests that apply_deletions correctly handles an error when applying the row deletions.
+ #[tokio::test]
+ async fn test_apply_deletions_invalid_row_address() {
+ use super::exec::apply_deletions;
+ use roaring::RoaringTreemap;
+
+ let test_uri = "memory://test_apply_deletions_error.lance";
+
+ // Create a dataset with 2 fragments, each with 3 rows
+ let ds = create_test_dataset(test_uri, LanceFileVersion::V2_0, false).await;
+ let fragment_id = ds.get_fragments()[0].id() as u32;
+
+ // Create row addresses with invalid row offsets for this fragment
+ // Row address format: high 32 bits = fragment_id, low 32 bits = row_offset
+ // Each fragment has only 3 rows (offsets 0, 1, 2).
+ //
+ // The error in extend_deletions is triggered when deletion_vector.len() >= physical_rows
+ // AND at least one row ID is >= physical_rows.
+ // So we need to add enough deletions (at least 3) with some being invalid (>= 3).
+ let mut invalid_row_addrs = RoaringTreemap::new();
+ let base = (fragment_id as u64) << 32;
+ // Add 4 deletions: rows 10, 11, 12, 13 (all invalid since only rows 0-2 exist)
+ for row_offset in 10..14u64 {
+ invalid_row_addrs.insert(base | row_offset);
+ }
+
+ let result = apply_deletions(&ds, &invalid_row_addrs).await;
+
+ assert!(result.is_err(), "Expected error for invalid row addresses");
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Deletion vector includes rows that aren't in the fragment"),
+ "Expected 'rows that aren't in the fragment' error, got: {}",
+ err
+ );
+ }
}
diff --git a/rust/lance/src/dataset/write/merge_insert/assign_action.rs b/rust/lance/src/dataset/write/merge_insert/assign_action.rs
index 94ab73dd4bd..1b9edadad6b 100644
--- a/rust/lance/src/dataset/write/merge_insert/assign_action.rs
+++ b/rust/lance/src/dataset/write/merge_insert/assign_action.rs
@@ -125,6 +125,9 @@ pub fn merge_insert_action(
WhenMatched::Fail => {
cases.push((matched, Action::Fail.as_literal_expr()));
}
+ WhenMatched::Delete => {
+ cases.push((matched, Action::Delete.as_literal_expr()));
+ }
}
match ¶ms.delete_not_matched_by_source {
diff --git a/rust/lance/src/dataset/write/merge_insert/exec.rs b/rust/lance/src/dataset/write/merge_insert/exec.rs
index 79648424e34..de39d0a1610 100644
--- a/rust/lance/src/dataset/write/merge_insert/exec.rs
+++ b/rust/lance/src/dataset/write/merge_insert/exec.rs
@@ -1,12 +1,22 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
+mod delete;
mod write;
+use std::collections::BTreeMap;
+use std::sync::Arc;
+
use datafusion::physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
+use futures::StreamExt;
+use lance_table::format::Fragment;
+use roaring::RoaringTreemap;
+
+pub use delete::DeleteOnlyMergeInsertExec;
pub use write::FullSchemaMergeInsertExec;
use super::MergeStats;
+use crate::Dataset;
pub(super) struct MergeInsertMetrics {
pub num_inserted_rows: Count,
@@ -45,3 +55,49 @@ impl MergeInsertMetrics {
}
}
}
+
+pub(super) async fn apply_deletions(
+ dataset: &Dataset,
+ removed_row_addrs: &RoaringTreemap,
+) -> crate::Result<(Vec, Vec)> {
+ let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::>());
+
+ enum FragmentChange {
+ Unchanged,
+ Modified(Box),
+ Removed(u64),
+ }
+
+ let mut updated_fragments = Vec::new();
+ let mut removed_fragments = Vec::new();
+
+ let mut stream = futures::stream::iter(dataset.get_fragments())
+ .map(move |fragment| {
+ let bitmaps_ref = bitmaps.clone();
+ async move {
+ let fragment_id = fragment.id();
+ if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) {
+ match fragment.extend_deletions(*bitmap).await {
+ Ok(Some(new_fragment)) => {
+ Ok(FragmentChange::Modified(Box::new(new_fragment.metadata)))
+ }
+ Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)),
+ Err(e) => Err(e),
+ }
+ } else {
+ Ok(FragmentChange::Unchanged)
+ }
+ }
+ })
+ .buffer_unordered(dataset.object_store.io_parallelism());
+
+ while let Some(res) = stream.next().await.transpose()? {
+ match res {
+ FragmentChange::Unchanged => {}
+ FragmentChange::Modified(fragment) => updated_fragments.push(*fragment),
+ FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id),
+ }
+ }
+
+ Ok((updated_fragments, removed_fragments))
+}
diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs
new file mode 100644
index 00000000000..12ae2359bab
--- /dev/null
+++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs
@@ -0,0 +1,321 @@
+// SPDX-License-Identifier: Apache-2.0
+// SPDX-FileCopyrightText: Copyright The Lance Authors
+
+use std::sync::{Arc, Mutex};
+
+use arrow_array::{Array, RecordBatch, UInt64Array, UInt8Array};
+use datafusion::common::Result as DFResult;
+use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use datafusion::{
+ execution::{SendableRecordBatchStream, TaskContext},
+ physical_plan::{
+ execution_plan::{Boundedness, EmissionType},
+ stream::RecordBatchStreamAdapter,
+ DisplayAs, ExecutionPlan, PlanProperties,
+ },
+};
+use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
+use futures::StreamExt;
+use lance_core::ROW_ADDR;
+use roaring::RoaringTreemap;
+
+use crate::dataset::transaction::{Operation, Transaction};
+use crate::dataset::write::merge_insert::assign_action::Action;
+use crate::dataset::write::merge_insert::{MergeInsertParams, MergeStats, MERGE_ACTION_COLUMN};
+use crate::Dataset;
+
+use super::{apply_deletions, MergeInsertMetrics};
+
+/// Specialized physical execution node for delete-only merge insert operations.
+///
+/// This is an optimized path for when `WhenMatched::Delete` is used without inserts.
+/// Unlike `FullSchemaMergeInsertExec`, this node:
+/// - Only reads `_rowaddr` and `__action` columns (no data columns needed)
+/// - Skips the write step entirely (no new fragments created)
+/// - Only applies deletions to existing fragments
+///
+/// This is significantly more efficient for bulk delete operations where
+/// we only need to identify matching rows and mark them as deleted.
+#[derive(Debug)]
+pub struct DeleteOnlyMergeInsertExec {
+ input: Arc,
+ dataset: Arc,
+ params: MergeInsertParams,
+ properties: PlanProperties,
+ metrics: ExecutionPlanMetricsSet,
+ merge_stats: Arc>>,
+ transaction: Arc>>,
+ affected_rows: Arc>>,
+}
+
+impl DeleteOnlyMergeInsertExec {
+ pub fn try_new(
+ input: Arc,
+ dataset: Arc,
+ params: MergeInsertParams,
+ ) -> DFResult {
+ let empty_schema = Arc::new(arrow_schema::Schema::empty());
+ let properties = PlanProperties::new(
+ EquivalenceProperties::new(empty_schema),
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Final,
+ Boundedness::Bounded,
+ );
+
+ Ok(Self {
+ input,
+ dataset,
+ params,
+ properties,
+ metrics: ExecutionPlanMetricsSet::new(),
+ merge_stats: Arc::new(Mutex::new(None)),
+ transaction: Arc::new(Mutex::new(None)),
+ affected_rows: Arc::new(Mutex::new(None)),
+ })
+ }
+
+ /// Takes the merge statistics if the execution has completed.
+ pub fn merge_stats(&self) -> Option {
+ self.merge_stats
+ .lock()
+ .ok()
+ .and_then(|mut guard| guard.take())
+ }
+
+ /// Takes the transaction if the execution has completed.
+ pub fn transaction(&self) -> Option {
+ self.transaction
+ .lock()
+ .ok()
+ .and_then(|mut guard| guard.take())
+ }
+
+ /// Takes the affected rows (deleted row addresses) if the execution has completed.
+ pub fn affected_rows(&self) -> Option {
+ self.affected_rows
+ .lock()
+ .ok()
+ .and_then(|mut guard| guard.take())
+ }
+
+ async fn collect_deletions(
+ mut input_stream: SendableRecordBatchStream,
+ metrics: MergeInsertMetrics,
+ ) -> DFResult {
+ let schema = input_stream.schema();
+
+ let (rowaddr_idx, _) = schema.column_with_name(ROW_ADDR).ok_or_else(|| {
+ datafusion::error::DataFusionError::Internal(
+ "Expected _rowaddr column in delete-only merge insert input".to_string(),
+ )
+ })?;
+
+ let (action_idx, _) = schema
+ .column_with_name(MERGE_ACTION_COLUMN)
+ .ok_or_else(|| {
+ datafusion::error::DataFusionError::Internal(format!(
+ "Expected {} column in delete-only merge insert input",
+ MERGE_ACTION_COLUMN
+ ))
+ })?;
+
+ let mut delete_row_addrs = RoaringTreemap::new();
+
+ while let Some(batch_result) = input_stream.next().await {
+ let batch = batch_result?;
+
+ let row_addr_array = batch
+ .column(rowaddr_idx)
+ .as_any()
+ .downcast_ref::()
+ .ok_or_else(|| {
+ datafusion::error::DataFusionError::Internal(
+ "Expected UInt64Array for _rowaddr column".to_string(),
+ )
+ })?;
+
+ let action_array = batch
+ .column(action_idx)
+ .as_any()
+ .downcast_ref::()
+ .ok_or_else(|| {
+ datafusion::error::DataFusionError::Internal(format!(
+ "Expected UInt8Array for {} column",
+ MERGE_ACTION_COLUMN
+ ))
+ })?;
+
+ for row_idx in 0..batch.num_rows() {
+ let action_code = action_array.value(row_idx);
+ let action = Action::try_from(action_code).map_err(|e| {
+ datafusion::error::DataFusionError::Internal(format!(
+ "Invalid action code {}: {}",
+ action_code, e
+ ))
+ })?;
+
+ if action == Action::Delete && !row_addr_array.is_null(row_idx) {
+ let row_addr = row_addr_array.value(row_idx);
+ delete_row_addrs.insert(row_addr);
+ metrics.num_deleted_rows.add(1);
+ }
+ }
+ }
+
+ Ok(delete_row_addrs)
+ }
+}
+
+impl DisplayAs for DeleteOnlyMergeInsertExec {
+ fn fmt_as(
+ &self,
+ t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ datafusion::physical_plan::DisplayFormatType::Default
+ | datafusion::physical_plan::DisplayFormatType::Verbose => {
+ let on_keys = self.params.on.join(", ");
+ write!(
+ f,
+ "DeleteOnlyMergeInsert: on=[{}], when_matched=Delete, when_not_matched=DoNothing",
+ on_keys
+ )
+ }
+ datafusion::physical_plan::DisplayFormatType::TreeRender => {
+ write!(f, "DeleteOnlyMergeInsert[{}]", self.dataset.uri())
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for DeleteOnlyMergeInsertExec {
+ fn name(&self) -> &str {
+ "DeleteOnlyMergeInsertExec"
+ }
+
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> arrow_schema::SchemaRef {
+ Arc::new(arrow_schema::Schema::empty())
+ }
+
+ fn children(&self) -> Vec<&Arc> {
+ vec![&self.input]
+ }
+
+ fn with_new_children(
+ self: Arc,
+ children: Vec>,
+ ) -> DFResult> {
+ if children.len() != 1 {
+ return Err(datafusion::error::DataFusionError::Internal(
+ "DeleteOnlyMergeInsertExec requires exactly one child".to_string(),
+ ));
+ }
+ Ok(Arc::new(Self {
+ input: children[0].clone(),
+ dataset: self.dataset.clone(),
+ params: self.params.clone(),
+ properties: self.properties.clone(),
+ metrics: self.metrics.clone(),
+ merge_stats: self.merge_stats.clone(),
+ transaction: self.transaction.clone(),
+ affected_rows: self.affected_rows.clone(),
+ }))
+ }
+
+ fn metrics(&self) -> Option {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
+ }
+
+ fn supports_limit_pushdown(&self) -> bool {
+ false
+ }
+
+ fn required_input_distribution(&self) -> Vec {
+ vec![datafusion_physical_expr::Distribution::SinglePartition]
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec {
+ vec![false]
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc,
+ ) -> DFResult {
+ let _baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+ let metrics = MergeInsertMetrics::new(&self.metrics, partition);
+ let input_stream = self.input.execute(partition, context)?;
+
+ let dataset = self.dataset.clone();
+ let merge_stats_holder = self.merge_stats.clone();
+ let transaction_holder = self.transaction.clone();
+ let affected_rows_holder = self.affected_rows.clone();
+ let mem_wal_to_merge = self.params.mem_wal_to_merge.clone();
+
+ let result_stream = futures::stream::once(async move {
+ let delete_row_addrs = Self::collect_deletions(input_stream, metrics).await?;
+
+ let (updated_fragments, removed_fragment_ids) =
+ apply_deletions(&dataset, &delete_row_addrs)
+ .await
+ .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
+
+ let operation = Operation::Update {
+ removed_fragment_ids,
+ updated_fragments,
+ new_fragments: vec![],
+ fields_modified: vec![],
+ mem_wal_to_merge,
+ fields_for_preserving_frag_bitmap: dataset
+ .schema()
+ .fields
+ .iter()
+ .map(|f| f.id as u32)
+ .collect(),
+ update_mode: None,
+ };
+
+ let transaction = Transaction::new(dataset.manifest.version, operation, None);
+
+ let num_deleted = delete_row_addrs.len();
+ let stats = MergeStats {
+ num_deleted_rows: num_deleted,
+ num_inserted_rows: 0,
+ num_updated_rows: 0,
+ bytes_written: 0,
+ num_files_written: 0,
+ num_attempts: 1,
+ };
+
+ if let Ok(mut transaction_guard) = transaction_holder.lock() {
+ transaction_guard.replace(transaction);
+ }
+ if let Ok(mut merge_stats_guard) = merge_stats_holder.lock() {
+ merge_stats_guard.replace(stats);
+ }
+ if let Ok(mut affected_rows_guard) = affected_rows_holder.lock() {
+ affected_rows_guard.replace(delete_row_addrs);
+ }
+
+ let empty_schema = Arc::new(arrow_schema::Schema::empty());
+ let empty_batch = RecordBatch::new_empty(empty_schema);
+ Ok(empty_batch)
+ });
+
+ let empty_schema = Arc::new(arrow_schema::Schema::empty());
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ empty_schema,
+ result_stream,
+ )))
+ }
+}
diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs
index 6644fefaf68..b2cef2dc601 100644
--- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs
+++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs
@@ -19,7 +19,10 @@ use datafusion::{
};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use futures::{stream, StreamExt};
+use lance_core::{Error, ROW_ADDR, ROW_ID};
+use lance_table::format::RowIdMeta;
use roaring::RoaringTreemap;
+use snafu::location;
use crate::dataset::transaction::UpdateMode::RewriteRows;
use crate::dataset::utils::CapturedRowIds;
@@ -37,12 +40,10 @@ use crate::{
write_fragments_internal, WriteParams,
},
},
- Dataset, Result,
+ Dataset,
};
-use lance_core::{Error, ROW_ADDR, ROW_ID};
-use lance_table::format::{Fragment, RowIdMeta};
-use snafu::location;
-use std::collections::BTreeMap;
+
+use super::apply_deletions;
/// Shared state for merge insert operations to simplify lock management
struct MergeState {
@@ -178,25 +179,31 @@ impl FullSchemaMergeInsertExec {
})
}
- /// Returns the merge statistics if the execution has completed.
+ /// Takes the merge statistics if the execution has completed.
/// Returns `None` if the execution is still in progress or hasn't started.
pub fn merge_stats(&self) -> Option {
- self.merge_stats.lock().ok().and_then(|guard| guard.clone())
+ self.merge_stats
+ .lock()
+ .ok()
+ .and_then(|mut guard| guard.take())
}
- /// Returns the transaction if the execution has completed.
+ /// Takes the transaction if the execution has completed.
/// Returns `None` if the execution is still in progress or hasn't started.
pub fn transaction(&self) -> Option {
- self.transaction.lock().ok().and_then(|guard| guard.clone())
+ self.transaction
+ .lock()
+ .ok()
+ .and_then(|mut guard| guard.take())
}
- /// Returns the affected rows (deleted/updated row addresses) if the execution has completed.
+ /// Takes the affected rows (deleted/updated row addresses) if the execution has completed.
/// Returns `None` if the execution is still in progress or hasn't started.
pub fn affected_rows(&self) -> Option {
self.affected_rows
.lock()
.ok()
- .and_then(|guard| guard.clone())
+ .and_then(|mut guard| guard.take())
}
/// Creates a filtered stream that captures row addresses for deletion and returns
@@ -484,53 +491,6 @@ impl FullSchemaMergeInsertExec {
(total_bytes as usize, total_files)
}
- /// Delete a batch of rows by row address, returns the fragments modified and the fragments removed
- async fn apply_deletions(
- dataset: &Dataset,
- removed_row_addrs: &RoaringTreemap,
- ) -> Result<(Vec, Vec)> {
- let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::>());
-
- enum FragmentChange {
- Unchanged,
- Modified(Box),
- Removed(u64),
- }
-
- let mut updated_fragments = Vec::new();
- let mut removed_fragments = Vec::new();
-
- let mut stream = futures::stream::iter(dataset.get_fragments())
- .map(move |fragment| {
- let bitmaps_ref = bitmaps.clone();
- async move {
- let fragment_id = fragment.id();
- if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) {
- match fragment.extend_deletions(*bitmap).await {
- Ok(Some(new_fragment)) => {
- Ok(FragmentChange::Modified(Box::new(new_fragment.metadata)))
- }
- Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)),
- Err(e) => Err(e),
- }
- } else {
- Ok(FragmentChange::Unchanged)
- }
- }
- })
- .buffer_unordered(dataset.object_store.io_parallelism());
-
- while let Some(res) = stream.next().await.transpose()? {
- match res {
- FragmentChange::Unchanged => {}
- FragmentChange::Modified(fragment) => updated_fragments.push(*fragment),
- FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id),
- }
- }
-
- Ok((updated_fragments, removed_fragments))
- }
-
fn split_updates_and_inserts(
&self,
input_stream: SendableRecordBatchStream,
@@ -701,6 +661,7 @@ impl DisplayAs for FullSchemaMergeInsertExec {
format!("UpdateIf({})", condition)
}
crate::dataset::WhenMatched::Fail => "Fail".to_string(),
+ crate::dataset::WhenMatched::Delete => "Delete".to_string(),
};
let when_not_matched = if self.params.insert_not_matched {
"InsertAll"
@@ -873,7 +834,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec {
let delete_row_addrs_clone = merge_state.delete_row_addrs;
let (updated_fragments, removed_fragment_ids) =
- Self::apply_deletions(&dataset, &delete_row_addrs_clone).await?;
+ apply_deletions(&dataset, &delete_row_addrs_clone).await?;
// Step 4: Create the transaction operation
let operation = Operation::Update {
diff --git a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs
index 40ce12d3b42..f25c7ef11c3 100644
--- a/rust/lance/src/dataset/write/merge_insert/logical_plan.rs
+++ b/rust/lance/src/dataset/write/merge_insert/logical_plan.rs
@@ -13,7 +13,11 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNod
use lance_core::{ROW_ADDR, ROW_ID};
use std::{cmp::Ordering, sync::Arc};
-use crate::{dataset::write::merge_insert::exec::FullSchemaMergeInsertExec, Dataset};
+use crate::dataset::write::merge_insert::exec::{
+ DeleteOnlyMergeInsertExec, FullSchemaMergeInsertExec,
+};
+use crate::dataset::{WhenMatched, WhenNotMatchedBySource};
+use crate::Dataset;
use super::{MergeInsertParams, MERGE_ACTION_COLUMN};
@@ -99,6 +103,7 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode {
crate::dataset::WhenMatched::UpdateAll => "UpdateAll",
crate::dataset::WhenMatched::UpdateIf(_) => "UpdateIf",
crate::dataset::WhenMatched::Fail => "Fail",
+ crate::dataset::WhenMatched::Delete => "Delete",
};
let when_not_matched = if self.params.insert_not_matched {
"InsertAll"
@@ -145,19 +150,33 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode {
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option>> {
// Going to need:
- // * all columns from the `source` relation
+ // * all columns from the `source` relation (or just key columns for delete-only)
// * `__action` column (unqualified)
// * `target._rowaddr` column specifically
let input_schema = self.input.schema();
let mut necessary_columns = Vec::new();
+ // Check if this is a delete-only operation (no writes needed)
+ // In delete-only mode, we only need the key columns from source for matching
+ let no_upsert = matches!(
+ self.params.when_matched,
+ crate::dataset::WhenMatched::Delete
+ ) && !self.params.insert_not_matched;
+
for (i, (qualifier, field)) in input_schema.iter().enumerate() {
let should_include = match qualifier {
- // Include all source columns - they contain the new data to write
- Some(qualifier) if qualifier.table() == "source" => true,
+ // For delete-only: only include source KEY columns (for matching)
+ // For other ops: include all source columns - they contain the new data to write
+ Some(qualifier) if qualifier.table() == "source" => {
+ if no_upsert {
+ self.params.on.iter().any(|k| k == field.name())
+ } else {
+ true
+ }
+ }
- // Include target._rowaddr specifically - needed to locate existing rows for updates
+ // Include target._rowaddr specifically - needed to locate existing rows for updates/deletes
Some(qualifier) if qualifier.table() == "target" && field.name() == ROW_ADDR => {
true
}
@@ -184,6 +203,23 @@ impl UserDefinedLogicalNodeCore for MergeInsertWriteNode {
/// Physical planner for MergeInsertWriteNode.
pub struct MergeInsertPlanner {}
+impl MergeInsertPlanner {
+ /// Check if this is a delete-only operation that can use the optimized path.
+ ///
+ /// Delete-only operations are when:
+ /// - `when_matched` is `Delete`
+ /// - `insert_not_matched` is `false` (no inserts)
+ /// - `delete_not_matched_by_source` is `Keep` (no additional deletes of unmatched target rows)
+ fn is_delete_only(params: &MergeInsertParams) -> bool {
+ matches!(params.when_matched, WhenMatched::Delete)
+ && !params.insert_not_matched
+ && matches!(
+ params.delete_not_matched_by_source,
+ WhenNotMatchedBySource::Keep
+ )
+ }
+}
+
#[async_trait]
impl ExtensionPlanner for MergeInsertPlanner {
async fn plan_extension(
@@ -198,12 +234,21 @@ impl ExtensionPlanner for MergeInsertPlanner {
if let Some(write_node) = node.as_any().downcast_ref::() {
assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs");
assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs");
- let exec = FullSchemaMergeInsertExec::try_new(
- physical_inputs[0].clone(),
- write_node.dataset.clone(),
- write_node.params.clone(),
- )?;
- Some(Arc::new(exec))
+
+ let exec: Arc = if Self::is_delete_only(&write_node.params) {
+ Arc::new(DeleteOnlyMergeInsertExec::try_new(
+ physical_inputs[0].clone(),
+ write_node.dataset.clone(),
+ write_node.params.clone(),
+ )?)
+ } else {
+ Arc::new(FullSchemaMergeInsertExec::try_new(
+ physical_inputs[0].clone(),
+ write_node.dataset.clone(),
+ write_node.params.clone(),
+ )?)
+ };
+ Some(exec)
} else {
None
},