Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions java/lance-jni/src/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use jni::sys::jlong;
use jni::JNIEnv;
use lance::dataset::scanner::ExprFilter;
use lance::dataset::{
MergeInsertBuilder, MergeStats, WhenMatched, WhenNotMatched, WhenNotMatchedBySource,
MergeInsertBuilder, MergeStats, SortOptions, WhenMatched, WhenNotMatched,
WhenNotMatchedBySource,
};
use lance_core::datatypes::Schema;
use std::sync::Arc;
Expand Down Expand Up @@ -48,6 +49,8 @@ fn inner_merge_insert<'local>(
let conflict_retries = extract_conflict_retries(env, &jparam)?;
let retry_timeout_ms = extract_retry_timeout_ms(env, &jparam)?;
let skip_auto_cleanup = extract_skip_auto_cleanup(env, &jparam)?;
let dedupe_by = extract_dedupe_by(env, &jparam)?;
let dedupe_sort_options = extract_dedupe_sort_options(env, &jparam)?;

let (new_ds, merge_stats) = unsafe {
let dataset = env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET)?;
Expand All @@ -58,14 +61,21 @@ fn inner_merge_insert<'local>(
when_not_matched_by_source_delete_expr,
)?;

let merge_insert_job = MergeInsertBuilder::try_new(Arc::new(dataset.clone().inner), on)?
let mut builder = MergeInsertBuilder::try_new(Arc::new(dataset.clone().inner), on)?;
builder
.when_matched(when_matched)
.when_not_matched(when_not_matched)
.when_not_matched_by_source(when_not_matched_by_source)
.conflict_retries(conflict_retries)
.retry_timeout(Duration::from_millis(retry_timeout_ms as u64))
.retry_timeout(Duration::from_millis(retry_timeout_ms))
.skip_auto_cleanup(skip_auto_cleanup)
.try_build()?;
.dedupe_sort_options(dedupe_sort_options);

if let Some(column) = dedupe_by {
builder.dedupe_by(column);
}

let merge_insert_job = builder.try_build()?;

let stream_ptr = batch_address as *mut FFI_ArrowArrayStream;
let source_stream = ArrowArrayStreamReader::from_raw(stream_ptr)?;
Expand Down Expand Up @@ -228,6 +238,26 @@ fn extract_skip_auto_cleanup<'local>(env: &mut JNIEnv<'local>, jparam: &JObject)
Ok(skip_auto_cleanup)
}

fn extract_dedupe_by<'local>(env: &mut JNIEnv<'local>, jparam: &JObject) -> Result<Option<String>> {
let dedupe_by = env
.call_method(jparam, "dedupeBy", "()Ljava/util/Optional;", &[])?
.l()?;
env.get_string_opt(&dedupe_by)
}

fn extract_dedupe_sort_options<'local>(
env: &mut JNIEnv<'local>,
jparam: &JObject,
) -> Result<SortOptions> {
let descending = env
.call_method(jparam, "dedupeDescending", "()Z", &[])?
.z()?;
let nulls_first = env
.call_method(jparam, "dedupeNullsFirst", "()Z", &[])?
.z()?;
Ok(SortOptions::new(descending, nulls_first))
}

const MERGE_STATS_CLASS: &str = "org/lance/merge/MergeInsertStats";
const MERGE_STATS_CONSTRUCTOR_SIG: &str = "(JJJIJJ)V";
const MERGE_RESULT_CLASS: &str = "org/lance/merge/MergeInsertResult";
Expand Down
108 changes: 108 additions & 0 deletions java/src/main/java/org/lance/merge/MergeInsertParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class MergeInsertParams {
private long retryTimeoutMs = 30 * 1000;
private boolean skipAutoCleanup = false;

private Optional<String> dedupeBy = Optional.empty();
private SortOptions dedupeSortOptions = SortOptions.defaultOptions();

public MergeInsertParams(List<String> on) {
this.on = on;
}
Expand Down Expand Up @@ -262,6 +265,56 @@ public boolean skipAutoCleanup() {
return skipAutoCleanup;
}

/**
* Configure deduplication when source data has multiple rows with the same key.
*
* <p>When the source data contains duplicate keys, this setting determines which row to keep
* based on the value of the specified column. Use with {@link #withDedupeSortOptions} to control
* sorting behavior.
*
* @param column The name of the column to use for comparing duplicate rows.
* @return This MergeInsertParams instance
*/
public MergeInsertParams withDedupeBy(String column) {
Preconditions.checkNotNull(column);
this.dedupeBy = Optional.of(column);
return this;
}

/**
* Set the sort options for deduplication.
*
* <p>When source data has duplicate keys and {@link #withDedupeBy} is set, this controls which
* row to keep based on the sort options.
*
* <p>If values are equal (including both NULL), the operation fails with an error.
*
* @param options The sort options to use for deduplication.
* @return This MergeInsertParams instance
* @see SortOptions
*/
public MergeInsertParams withDedupeSortOptions(SortOptions options) {
Preconditions.checkNotNull(options);
this.dedupeSortOptions = options;
return this;
}

public Optional<String> dedupeBy() {
return dedupeBy;
}

public SortOptions dedupeSortOptions() {
return dedupeSortOptions;
}

public boolean dedupeDescending() {
return dedupeSortOptions.isDescending();
}

public boolean dedupeNullsFirst() {
return dedupeSortOptions.isNullsFirst();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -279,9 +332,64 @@ public String toString() {
.add("conflictRetries", conflictRetries)
.add("retryTimeoutMs", retryTimeoutMs)
.add("skipAutoCleanup", skipAutoCleanup)
.add("dedupeBy", dedupeBy.orElse(null))
.add("dedupeSortOptions", dedupeSortOptions)
.toString();
}

/**
* Sort options for deduplication when source data has duplicate keys.
*
* <p>Controls sort direction and NULL handling for determining which row to keep.
*/
public static class SortOptions {
private final boolean descending;
private final boolean nullsFirst;

/**
* Create sort options with the given settings.
*
* @param descending If true, keep the row with the largest value. Default false (smallest
* wins).
* @param nullsFirst If true, NULL values win over non-NULL. Default false (NULL loses).
*/
public SortOptions(boolean descending, boolean nullsFirst) {
this.descending = descending;
this.nullsFirst = nullsFirst;
}

/** Default options: ascending order, NULL loses to non-NULL. */
public static SortOptions defaultOptions() {
return new SortOptions(false, false);
}

/** Ascending order: keep the row with the smallest value. NULL loses. */
public static SortOptions ascending() {
return new SortOptions(false, false);
}

/** Descending order: keep the row with the largest value. NULL loses. */
public static SortOptions descending() {
return new SortOptions(true, false);
}

public boolean isDescending() {
return descending;
}

public boolean isNullsFirst() {
return nullsFirst;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("descending", descending)
.add("nullsFirst", nullsFirst)
.toString();
}
}

public enum WhenMatched {
/**
* The row is deleted from the target table and a new row is inserted based on the source table.
Expand Down
Loading
Loading