Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/rest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,13 @@ paths:
required: true
schema:
type: string
- name: "when_matched_delete"
in: query
description: Delete all rows in target table where a match exists in source table
required: false
schema:
type: boolean
default: false
- name: "when_matched_update_all"
in: query
description: Update all columns when rows match
Expand Down Expand Up @@ -786,6 +793,7 @@ paths:
It passes in the `MergeInsertIntoTableRequest` information in the following way:
- `id`: pass through path parameter of the same name
- `on`: pass through query parameter of the same name
- `when_matched_delete`: pass through query parameter of the same name
- `when_matched_update_all`: pass through query parameter of the same name
- `when_matched_update_all_filt`: pass through query parameter of the same name
- `when_not_matched_insert_all`: pass through query parameter of the same name
Expand Down Expand Up @@ -1938,6 +1946,10 @@ components:
"on":
description: Column name to use for matching rows (required)
type: string
when_matched_delete:
description: Delete all rows in target table where a match exists in source table
type: boolean
default: false
when_matched_update_all:
description: Update all columns when rows match
type: boolean
Expand Down
1 change: 1 addition & 0 deletions java/lance-jni/src/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ fn extract_when_matched<'local>(env: &mut JNIEnv<'local>, jparam: &JObject) -> R
None => Err(Error::input_error("No matched updated expr".to_string())),
},
"Fail" => Ok(WhenMatched::Fail),
"Delete" => Ok(WhenMatched::Delete),
_ => Err(Error::input_error(format!(
"Illegal when_matched: {when_matched}",
))),
Expand Down
19 changes: 19 additions & 0 deletions java/src/main/java/org/lance/merge/MergeInsertParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ public MergeInsertParams withMatchedDoNothing() {
return this;
}

/**
* Specify that when a row in the source table matches a row in the target table, the row in the
* target table is deleted.
*
* <p>This can be used to achieve "when matched delete" behavior.
*
* @return This MergeInsertParams instance
*/
public MergeInsertParams withMatchedDelete() {
this.whenMatched = WhenMatched.Delete;
return this;
}

/**
* Specify that when a row in the source table matches a row in the target table and the
* expression evaluates to true, the row in the target table is updated by the matched row from
Expand Down Expand Up @@ -303,6 +316,12 @@ public enum WhenMatched {
* used to ensure that no existing rows are overwritten or modified after inserted.
*/
Fail,

/**
* The row is deleted from the target table when a row in the source table matches a row in the
* target table.
*/
Delete
}

public enum WhenNotMatched {
Expand Down
18 changes: 18 additions & 0 deletions java/src/test/java/org/lance/MergeInsertTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ public void testWhenMatchedFailWithoutMatches() throws Exception {
}
}

@Test
public void testWhenMatchedDelete() throws Exception {
// Test delete matched target rows if expression is true

try (VectorSchemaRoot source = buildSource(testDataset.getSchema(), allocator)) {
try (ArrowArrayStream sourceStream = convertToStream(source, allocator)) {
MergeInsertResult result =
dataset.mergeInsert(
new MergeInsertParams(Collections.singletonList("id"))
.withMatchedDelete()
.withNotMatched(MergeInsertParams.WhenNotMatched.DoNothing),
sourceStream);

Assertions.assertEquals("{3=Person 3, 4=Person 4}", readAll(result.dataset()).toString());
}
}
}

private VectorSchemaRoot buildSource(Schema schema, RootAllocator allocator) {
List<Integer> sourceIds = Arrays.asList(0, 1, 2, 7, 8, 9);

Expand Down
10 changes: 10 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ def when_matched_update_all(
"""
return super(MergeInsertBuilder, self).when_matched_update_all(condition)

def when_matched_delete(self) -> "MergeInsertBuilder":
"""
Configure the operation to delete matched rows in the target table.

After this method is called, when the merge insert operation executes,
any rows that match both the source table and the target table will be
deleted.
"""
return super(MergeInsertBuilder, self).when_matched_delete()

def when_matched_fail(self) -> "MergeInsertBuilder":
"""
Configure the operation to fail if any rows match
Expand Down
81 changes: 81 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2311,6 +2311,87 @@ def test_merge_insert_when_matched_fail(tmp_path: Path):
assert unchanged_data == expected


def test_merge_insert_when_matched_delete(tmp_path: Path):
"""Test when_matched_delete functionality for merge insert."""
# Create initial dataset with ids 1-6
data = pa.table({"id": [1, 2, 3, 4, 5, 6], "val": [10, 20, 30, 40, 50, 60]})
ds = lance.write_dataset(data, tmp_path / "dataset")
version = ds.version

# Test 1: Basic when_matched_delete - delete matched rows only
# Source has ids 4, 5, 6 (match) and 7, 8, 9 (no match)
# Only matched rows should be deleted, unmatched rows are ignored
delete_keys = pa.table({"id": [4, 5, 6, 7, 8, 9], "val": [0, 0, 0, 0, 0, 0]})
result = ds.merge_insert("id").when_matched_delete().execute(delete_keys)

assert result["num_deleted_rows"] == 3
assert result["num_inserted_rows"] == 0
assert result["num_updated_rows"] == 0

# Verify only ids 1, 2, 3 remain
remaining = ds.to_table().sort_by("id")
expected = pa.table({"id": [1, 2, 3], "val": [10, 20, 30]})
assert remaining == expected

# Test 2: when_matched_delete with ID-only source
# Source contains only the key column
ds = lance.dataset(tmp_path / "dataset", version=version)
ds.restore()

id_only_source = pa.table({"id": [2, 4, 6]}) # Delete even ids
result = ds.merge_insert("id").when_matched_delete().execute(id_only_source)

assert result["num_deleted_rows"] == 3
assert result["num_inserted_rows"] == 0
assert result["num_updated_rows"] == 0

# Verify only odd ids remain
remaining = ds.to_table().sort_by("id")
expected = pa.table({"id": [1, 3, 5], "val": [10, 30, 50]})
assert remaining == expected

# Test 3: when_matched_delete combined with when_not_matched_insert_all
# Delete existing rows that match, insert new rows that don't match
ds = lance.dataset(tmp_path / "dataset", version=version)
ds.restore()

new_data = pa.table(
{"id": [4, 5, 6, 7, 8, 9], "val": [400, 500, 600, 700, 800, 900]}
)
result = (
ds.merge_insert("id")
.when_matched_delete()
.when_not_matched_insert_all()
.execute(new_data)
)

# Should delete 3 (ids 4, 5, 6) and insert 3 (ids 7, 8, 9)
assert result["num_deleted_rows"] == 3
assert result["num_inserted_rows"] == 3
assert result["num_updated_rows"] == 0

# Verify: ids 1, 2, 3 (original), 7, 8, 9 (new inserts)
remaining = ds.to_table().sort_by("id")
expected = pa.table({"id": [1, 2, 3, 7, 8, 9], "val": [10, 20, 30, 700, 800, 900]})
assert remaining == expected

# Test 4: when_matched_delete with no matches (should be a no-op delete)
ds = lance.dataset(tmp_path / "dataset", version=version)
ds.restore()

non_matching = pa.table({"id": [100, 200, 300], "val": [0, 0, 0]})
result = ds.merge_insert("id").when_matched_delete().execute(non_matching)

assert result["num_deleted_rows"] == 0
assert result["num_inserted_rows"] == 0
assert result["num_updated_rows"] == 0

# Data should be unchanged
remaining = ds.to_table().sort_by("id")
expected = pa.table({"id": [1, 2, 3, 4, 5, 6], "val": [10, 20, 30, 40, 50, 60]})
assert remaining == expected


def test_merge_insert_large():
# Doing subcolumns update with merge insert triggers this error.
# Data needs to be large enough to make DataFusion create multiple batches
Expand Down
5 changes: 5 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ impl MergeInsertBuilder {
Ok(slf)
}

pub fn when_matched_delete(mut slf: PyRefMut<Self>) -> PyResult<PyRefMut<Self>> {
slf.builder.when_matched(WhenMatched::Delete);
Ok(slf)
}

pub fn when_not_matched_insert_all(mut slf: PyRefMut<Self>) -> PyResult<PyRefMut<Self>> {
slf.builder.when_not_matched(WhenNotMatched::InsertAll);
Ok(slf)
Expand Down
Loading
Loading