Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,14 @@ set(RUST_FFI_DEPENDS
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/mod.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/projection.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/scan.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/search.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/stream.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/types.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/update.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/util.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/write.rs
${CMAKE_CURRENT_LIST_DIR}/rust/lib.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/search.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/stream.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/take.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/types.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/update.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/util.rs
${CMAKE_CURRENT_LIST_DIR}/rust/ffi/write.rs
${CMAKE_CURRENT_LIST_DIR}/rust/lib.rs
${CMAKE_CURRENT_LIST_DIR}/rust/runtime.rs
${CMAKE_CURRENT_LIST_DIR}/rust/scanner.rs)

Expand Down
1 change: 1 addition & 0 deletions rust/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum ErrorCode {
DatasetListIndices = 45,
DatasetCreateScalarIndex = 46,
DatasetCalculateDataStats = 47,
DatasetTake = 48,
}

struct LastError {
Expand Down
34 changes: 34 additions & 0 deletions rust/ffi/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use std::ffi::{c_char, c_void, CStr};
use std::ptr;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_sql::unparser::expr_to_sql;
use lance::dataset::statistics::DatasetStatisticsExt;
use lance::dataset::builder::DatasetBuilder;
use lance::Dataset;

use crate::constants::ROW_ID_COLUMN;
use crate::error::{clear_last_error, set_last_error, ErrorCode};
use crate::runtime;

Expand Down Expand Up @@ -201,6 +203,38 @@ fn get_schema_inner(dataset: *mut c_void) -> FfiResult<super::types::SchemaHandl
Ok(handle.arrow_schema.clone())
}

#[no_mangle]
pub unsafe extern "C" fn lance_get_schema_for_scan(dataset: *mut c_void) -> *mut c_void {
match get_schema_for_scan_inner(dataset) {
Ok(schema) => {
clear_last_error();
Box::into_raw(Box::new(schema)) as *mut c_void
}
Err(err) => {
set_last_error(err.code, err.message);
ptr::null_mut()
}
}
}

fn get_schema_for_scan_inner(dataset: *mut c_void) -> FfiResult<super::types::SchemaHandle> {
let handle = unsafe { super::util::dataset_handle(dataset)? };

let mut schema: Schema = (*handle.arrow_schema).clone();
let has_row_id = schema.fields.iter().any(|f| f.name() == ROW_ID_COLUMN);
if !has_row_id {
let mut fields = schema.fields.iter().cloned().collect::<Vec<_>>();
fields.push(Arc::new(Field::new(
ROW_ID_COLUMN,
DataType::UInt64,
false,
)));
schema.fields = fields.into();
}

Ok(Arc::new(schema))
}

#[no_mangle]
pub unsafe extern "C" fn lance_dataset_list_fragments(
dataset: *mut c_void,
Expand Down
62 changes: 33 additions & 29 deletions rust/ffi/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ fn create_index_list_stream_inner(dataset: *mut c_void) -> FfiResult<StreamHandl
Arc::new(StringArray::from(details)),
],
)
.map_err(|err| FfiError::new(ErrorCode::IndexStreamCreate, format!("index list batch: {err}")))?;
.map_err(|err| {
FfiError::new(
ErrorCode::IndexStreamCreate,
format!("index list batch: {err}"),
)
})?;

Ok(StreamHandle::Batches(vec![batch].into_iter()))
}
Expand Down Expand Up @@ -240,7 +245,11 @@ fn dataset_create_index_inner(
None
} else {
let s = unsafe { cstr_to_str(params_json, "params_json")? };
if s.is_empty() { None } else { Some(s.to_string()) }
if s.is_empty() {
None
} else {
Some(s.to_string())
}
};

let columns = unsafe { super::util::optional_cstr_array(columns, columns_len, "columns")? };
Expand All @@ -255,11 +264,14 @@ fn dataset_create_index_inner(
None
} else {
let s = unsafe { cstr_to_str(index_name, "index_name")? };
if s.is_empty() { None } else { Some(s.to_string()) }
if s.is_empty() {
None
} else {
Some(s.to_string())
}
};

let (lance_index_type, params) =
build_index_params(&index_type_norm, params_json.as_deref())?;
let (lance_index_type, params) = build_index_params(&index_type_norm, params_json.as_deref())?;

let mut ds: Dataset = handle.dataset.as_ref().clone();
let replace = replace != 0;
Expand Down Expand Up @@ -408,16 +420,18 @@ fn build_index_params(
};

let mut params = ScalarIndexParams::for_builtin(scalar_type.try_into().map_err(|err| {
FfiError::new(ErrorCode::DatasetCreateIndex, format!("scalar index type: {err}"))
FfiError::new(
ErrorCode::DatasetCreateIndex,
format!("scalar index type: {err}"),
)
})?);
if let Some(json) = params_json {
params.params = Some(json.to_string());
} else if scalar_type == IndexType::Inverted {
// lance-index's InvertedIndexParams requires at least `base_tokenizer` and `language`.
// Provide a stable default that is also eligible for query acceleration.
params.params = Some(
r#"{"base_tokenizer":"simple","language":"English","stem":false}"#.to_string(),
);
params.params =
Some(r#"{"base_tokenizer":"simple","language":"English","stem":false}"#.to_string());
}
Ok((scalar_type, Box::new(params)))
}
Expand All @@ -435,7 +449,10 @@ fn is_vector_index_type(index_type: &str) -> bool {
)
}

fn build_vector_params(index_type: &str, params_json: Option<&str>) -> FfiResult<VectorIndexParams> {
fn build_vector_params(
index_type: &str,
params_json: Option<&str>,
) -> FfiResult<VectorIndexParams> {
let mut params = serde_json::Map::<String, serde_json::Value>::new();
if let Some(json) = params_json {
let v: serde_json::Value = serde_json::from_str(json).map_err(|err| {
Expand All @@ -459,19 +476,15 @@ fn build_vector_params(index_type: &str, params_json: Option<&str>) -> FfiResult
.and_then(|v| v.as_str())
.unwrap_or("l2");
let metric_type = DistanceType::try_from(metric).map_err(|err| {
FfiError::new(
ErrorCode::DatasetCreateIndex,
format!("metric_type: {err}"),
)
FfiError::new(ErrorCode::DatasetCreateIndex, format!("metric_type: {err}"))
})?;

let version = params
.get("version")
.and_then(|v| v.as_str())
.unwrap_or("v3");
let version = IndexFileVersion::try_from(version).map_err(|err| {
FfiError::new(ErrorCode::DatasetCreateIndex, format!("version: {err}"))
})?;
let version = IndexFileVersion::try_from(version)
.map_err(|err| FfiError::new(ErrorCode::DatasetCreateIndex, format!("version: {err}")))?;

let num_partitions = params
.get("num_partitions")
Expand All @@ -481,10 +494,7 @@ fn build_vector_params(index_type: &str, params_json: Option<&str>) -> FfiResult
let out = match index_type {
"IVF_FLAT" => VectorIndexParams::ivf_flat(num_partitions, metric_type),
"IVF_PQ" => {
let num_bits = params
.get("num_bits")
.and_then(|v| v.as_u64())
.unwrap_or(8) as u8;
let num_bits = params.get("num_bits").and_then(|v| v.as_u64()).unwrap_or(8) as u8;
let num_sub_vectors = params
.get("num_sub_vectors")
.and_then(|v| v.as_u64())
Expand All @@ -502,17 +512,11 @@ fn build_vector_params(index_type: &str, params_json: Option<&str>) -> FfiResult
)
}
"IVF_RQ" => {
let num_bits = params
.get("num_bits")
.and_then(|v| v.as_u64())
.unwrap_or(8) as u8;
let num_bits = params.get("num_bits").and_then(|v| v.as_u64()).unwrap_or(8) as u8;
VectorIndexParams::ivf_rq(num_partitions, num_bits, metric_type)
}
"IVF_SQ" => {
let num_bits = params
.get("num_bits")
.and_then(|v| v.as_u64())
.unwrap_or(8) as u8;
let num_bits = params.get("num_bits").and_then(|v| v.as_u64()).unwrap_or(8) as u8;
let ivf = lance_index::vector::ivf::IvfBuildParams::new(num_partitions);
let sq = lance_index::vector::sq::builder::SQBuildParams {
num_bits: num_bits as u16,
Expand Down
3 changes: 2 additions & 1 deletion rust/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ mod knn;
mod namespace;
mod projection;
mod scan;
mod search;
mod schema_evolution;
mod search;
mod stream;
mod take;
mod types;
mod update;
mod util;
Expand Down
12 changes: 8 additions & 4 deletions rust/ffi/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::ptr;
use crate::error::{clear_last_error, set_last_error, ErrorCode};
use crate::runtime;
use crate::scanner::LanceStream;
use crate::constants::ROW_ID_COLUMN;

use super::types::StreamHandle;
use super::util::{
Expand Down Expand Up @@ -63,6 +64,9 @@ fn create_fragment_stream_ir_inner(

let projection = unsafe { optional_cstr_array(columns, columns_len, "columns")? };
if !projection.is_empty() {
if projection.iter().any(|c| c == ROW_ID_COLUMN) {
scan.with_row_id();
}
scan.project(&projection).map_err(|err| {
FfiError::new(
ErrorCode::FragmentScan,
Expand Down Expand Up @@ -147,6 +151,9 @@ fn create_dataset_stream_ir_inner(

let projection = unsafe { optional_cstr_array(columns, columns_len, "columns")? };
if !projection.is_empty() {
if projection.iter().any(|c| c == ROW_ID_COLUMN) {
scan.with_row_id();
}
scan.project(&projection).map_err(|err| {
FfiError::new(
ErrorCode::DatasetScan,
Expand All @@ -171,10 +178,7 @@ fn create_dataset_stream_ir_inner(
let limit_opt = if limit == -1 { None } else { Some(limit) };
let offset_opt = if offset == 0 { None } else { Some(offset) };
scan.limit(limit_opt, offset_opt).map_err(|err| {
FfiError::new(
ErrorCode::DatasetScan,
format!("dataset scan limit: {err}"),
)
FfiError::new(ErrorCode::DatasetScan, format!("dataset scan limit: {err}"))
})?;
}

Expand Down
60 changes: 34 additions & 26 deletions rust/ffi/schema_evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ pub unsafe extern "C" fn lance_dataset_add_columns(
expressions_len: usize,
batch_size: u32,
) -> i32 {
match dataset_add_columns_inner(dataset, new_columns_schema, expressions, expressions_len, batch_size) {
match dataset_add_columns_inner(
dataset,
new_columns_schema,
expressions,
expressions_len,
batch_size,
) {
Ok(()) => {
clear_last_error();
0
Expand Down Expand Up @@ -185,13 +191,12 @@ fn dataset_add_columns_inner(
)
})?;
let arr = if arr.data_type() != field.data_type() {
compute::cast(&arr, field.data_type())
.map_err(|err| {
lance::Error::invalid_input(
format!("expression[{idx}] cast: {err}"),
location!(),
)
})?
compute::cast(&arr, field.data_type()).map_err(|err| {
lance::Error::invalid_input(
format!("expression[{idx}] cast: {err}"),
location!(),
)
})?
} else {
arr
};
Expand Down Expand Up @@ -422,9 +427,11 @@ fn dataset_update_table_metadata_inner(
let value = if value.is_null() {
None
} else {
Some(unsafe { CStr::from_ptr(value) }.to_str().map_err(|err| {
FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}"))
})?)
Some(
unsafe { CStr::from_ptr(value) }
.to_str()
.map_err(|err| FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}")))?,
)
};

let mut ds = (*handle.dataset).clone();
Expand Down Expand Up @@ -467,9 +474,11 @@ fn dataset_update_config_inner(
let value = if value.is_null() {
None
} else {
Some(unsafe { CStr::from_ptr(value) }.to_str().map_err(|err| {
FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}"))
})?)
Some(
unsafe { CStr::from_ptr(value) }
.to_str()
.map_err(|err| FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}")))?,
)
};

let mut ds = (*handle.dataset).clone();
Expand Down Expand Up @@ -512,9 +521,11 @@ fn dataset_update_schema_metadata_inner(
let value = if value.is_null() {
None
} else {
Some(unsafe { CStr::from_ptr(value) }.to_str().map_err(|err| {
FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}"))
})?)
Some(
unsafe { CStr::from_ptr(value) }
.to_str()
.map_err(|err| FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}")))?,
)
};

let mut ds = (*handle.dataset).clone();
Expand Down Expand Up @@ -560,9 +571,11 @@ fn dataset_update_field_metadata_inner(
let value = if value.is_null() {
None
} else {
Some(unsafe { CStr::from_ptr(value) }.to_str().map_err(|err| {
FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}"))
})?)
Some(
unsafe { CStr::from_ptr(value) }
.to_str()
.map_err(|err| FfiError::new(ErrorCode::Utf8, format!("value utf8: {err}")))?,
)
};

let mut ds = (*handle.dataset).clone();
Expand Down Expand Up @@ -770,12 +783,7 @@ fn dataset_list_kv_inner(dataset: *mut c_void, which: &'static str) -> FfiResult
out.push('\n');
}
}
_ => {
return Err(FfiError::new(
ErrorCode::InvalidArgument,
"unknown kv type",
))
}
_ => return Err(FfiError::new(ErrorCode::InvalidArgument, "unknown kv type")),
}

Ok(out)
Expand Down
Loading
Loading