Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 278 additions & 18 deletions rust/lance-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ pub enum Error {
source: BoxedError,
location: Location,
},
/// External error passed through from user code.
///
/// This variant preserves errors that users pass into Lance APIs (e.g., via streams
/// with custom error types). The original error can be recovered using [`Error::into_external`]
/// or inspected using [`Error::external_source`].
#[snafu(transparent)]
External { source: BoxedError },
}

impl Error {
Expand Down Expand Up @@ -164,6 +171,31 @@ impl Error {
location,
}
}

/// Create an External error from a boxed error source.
pub fn external(source: BoxedError) -> Self {
Self::External { source }
}

/// Returns a reference to the external error source if this is an `External` variant.
///
/// This allows downcasting to recover the original error type.
pub fn external_source(&self) -> Option<&BoxedError> {
match self {
Self::External { source } => Some(source),
_ => None,
}
}

/// Consumes the error and returns the external source if this is an `External` variant.
///
/// Returns `Err(self)` if this is not an `External` variant, allowing for chained handling.
pub fn into_external(self) -> std::result::Result<BoxedError, Self> {
match self {
Self::External { source } => Ok(source),
other => Err(other),
}
}
}

pub trait LanceOptionExt<T> {
Expand Down Expand Up @@ -202,9 +234,18 @@ pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFus
impl From<ArrowError> for Error {
#[track_caller]
fn from(e: ArrowError) -> Self {
Self::Arrow {
message: e.to_string(),
location: std::panic::Location::caller().to_snafu_location(),
match e {
ArrowError::ExternalError(source) => {
// Try to downcast to lance_core::Error first to recover the original
match source.downcast::<Self>() {
Ok(lance_err) => *lance_err,
Err(source) => Self::External { source },
}
}
other => Self::Arrow {
message: other.to_string(),
location: std::panic::Location::caller().to_snafu_location(),
},
}
}
}
Expand Down Expand Up @@ -309,20 +350,15 @@ impl From<serde_json::Error> for Error {
}
}

#[track_caller]
fn arrow_io_error_from_msg(message: String) -> ArrowError {
ArrowError::IoError(message.clone(), std::io::Error::other(message))
}

impl From<Error> for ArrowError {
fn from(value: Error) -> Self {
match value {
Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError
Error::IO { source, .. } => arrow_io_error_from_msg(source.to_string()),
// Pass through external errors directly
Error::External { source } => Self::ExternalError(source),
// Preserve schema errors with their specific type
Error::Schema { message, .. } => Self::SchemaError(message),
Error::Index { message, .. } => arrow_io_error_from_msg(message),
Error::Stop => arrow_io_error_from_msg("early stop".to_string()),
e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this
// Wrap all other lance errors so they can be recovered
e => Self::ExternalError(Box::new(e)),
}
}
}
Expand Down Expand Up @@ -353,7 +389,7 @@ impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
impl From<Error> for datafusion_common::DataFusionError {
#[track_caller]
fn from(e: Error) -> Self {
Self::Execution(e.to_string())
Self::External(Box::new(e))
}
}

Expand All @@ -373,10 +409,22 @@ impl From<datafusion_common::DataFusionError> for Error {
message: e.to_string(),
location,
},
datafusion_common::DataFusionError::ArrowError(..) => Self::Arrow {
message: e.to_string(),
location,
},
datafusion_common::DataFusionError::ArrowError(arrow_err, _) => {
// Check if the ArrowError wraps an external error and extract it
match *arrow_err {
ArrowError::ExternalError(source) => {
// Try to downcast to lance_core::Error first
match source.downcast::<Self>() {
Ok(lance_err) => *lance_err,
Err(source) => Self::External { source },
}
}
other => Self::Arrow {
message: other.to_string(),
location,
},
}
}
datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
source: box_error(e),
location,
Expand All @@ -385,6 +433,13 @@ impl From<datafusion_common::DataFusionError> for Error {
message: e.to_string(),
location,
},
datafusion_common::DataFusionError::External(source) => {
// Try to downcast to lance_core::Error first
match source.downcast::<Self>() {
Ok(lance_err) => *lance_err,
Err(source) => Self::External { source },
}
}
_ => Self::IO {
source: box_error(e),
location,
Expand Down Expand Up @@ -439,6 +494,7 @@ impl<T: Clone> From<Result<T>> for CloneableResult<T> {
#[cfg(test)]
mod test {
use super::*;
use std::fmt;

#[test]
fn test_caller_location_capture() {
Expand All @@ -461,4 +517,208 @@ mod test {
_ => panic!("expected ObjectStore error"),
}
}

#[derive(Debug)]
struct MyCustomError {
code: i32,
message: String,
}

impl fmt::Display for MyCustomError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MyCustomError({}): {}", self.code, self.message)
}
}

impl std::error::Error for MyCustomError {}

#[test]
fn test_external_error_creation() {
let custom_err = MyCustomError {
code: 42,
message: "test error".to_string(),
};
let err = Error::external(Box::new(custom_err));

match &err {
Error::External { source } => {
let recovered = source.downcast_ref::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 42);
assert_eq!(recovered.message, "test error");
}
_ => panic!("Expected External variant"),
}
}

#[test]
fn test_external_source_method() {
let custom_err = MyCustomError {
code: 123,
message: "source test".to_string(),
};
let err = Error::external(Box::new(custom_err));

let source = err.external_source().expect("should have external source");
let recovered = source.downcast_ref::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 123);

// Test that non-External variants return None
let io_err = Error::io("test", snafu::Location::new("test", 1, 1));
assert!(io_err.external_source().is_none());
}

#[test]
fn test_into_external_method() {
let custom_err = MyCustomError {
code: 456,
message: "into test".to_string(),
};
let err = Error::external(Box::new(custom_err));

match err.into_external() {
Ok(source) => {
let recovered = source.downcast::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 456);
}
Err(_) => panic!("Expected Ok"),
}

// Test that non-External variants return Err(self)
let io_err = Error::io("test", snafu::Location::new("test", 1, 1));
match io_err.into_external() {
Err(Error::IO { .. }) => {}
_ => panic!("Expected Err with IO variant"),
}
}

#[test]
fn test_arrow_external_error_conversion() {
let custom_err = MyCustomError {
code: 789,
message: "arrow test".to_string(),
};
let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
let lance_err: Error = arrow_err.into();

match lance_err {
Error::External { source } => {
let recovered = source.downcast_ref::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 789);
}
_ => panic!("Expected External variant, got {:?}", lance_err),
}
}

#[test]
fn test_external_to_arrow_roundtrip() {
let custom_err = MyCustomError {
code: 999,
message: "roundtrip".to_string(),
};
let lance_err = Error::external(Box::new(custom_err));
let arrow_err: ArrowError = lance_err.into();

match arrow_err {
ArrowError::ExternalError(source) => {
let recovered = source.downcast_ref::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 999);
}
_ => panic!("Expected ExternalError variant"),
}
}

#[cfg(feature = "datafusion")]
#[test]
fn test_datafusion_external_error_conversion() {
let custom_err = MyCustomError {
code: 111,
message: "datafusion test".to_string(),
};
let df_err = datafusion_common::DataFusionError::External(Box::new(custom_err));
let lance_err: Error = df_err.into();

match lance_err {
Error::External { source } => {
let recovered = source.downcast_ref::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 111);
}
_ => panic!("Expected External variant"),
}
}

#[cfg(feature = "datafusion")]
#[test]
fn test_datafusion_arrow_external_error_conversion() {
// Test the nested case: ArrowError::ExternalError inside DataFusionError::ArrowError
let custom_err = MyCustomError {
code: 222,
message: "nested test".to_string(),
};
let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
let df_err = datafusion_common::DataFusionError::ArrowError(Box::new(arrow_err), None);
let lance_err: Error = df_err.into();

match lance_err {
Error::External { source } => {
let recovered = source.downcast_ref::<MyCustomError>().unwrap();
assert_eq!(recovered.code, 222);
}
_ => panic!("Expected External variant, got {:?}", lance_err),
}
}

/// Test that lance_core::Error round-trips through ArrowError.
///
/// This simulates the case where a user defines an iterator in terms of
/// lance_core::Error, and the error goes through Arrow's error type
/// (e.g., via RecordBatchIterator) before being converted back.
#[test]
fn test_lance_error_roundtrip_through_arrow() {
let original = Error::invalid_input(
"test validation error",
snafu::Location::new("test.rs", 10, 1),
);

// Simulate what happens when using ? in an Arrow context
let arrow_err: ArrowError = original.into();

// Convert back to lance error (as happens when Lance consumes the stream)
let recovered: Error = arrow_err.into();

// Should get back the original lance error directly (not wrapped in External)
match recovered {
Error::InvalidInput { .. } => {
assert!(recovered.to_string().contains("test validation error"));
}
_ => panic!("Expected InvalidInput variant, got {:?}", recovered),
}
}

/// Test that lance_core::Error round-trips through DataFusionError.
///
/// This simulates the case where a user defines a stream in terms of
/// lance_core::Error, and the error goes through DataFusion's error type
/// (e.g., via SendableRecordBatchStream) before being converted back.
#[cfg(feature = "datafusion")]
#[test]
fn test_lance_error_roundtrip_through_datafusion() {
let original = Error::invalid_input(
"test validation error",
snafu::Location::new("test.rs", 10, 1),
);

// Simulate what happens when using ? in a DataFusion context
let df_err: datafusion_common::DataFusionError = original.into();

// Convert back to lance error (as happens when Lance consumes the stream)
let recovered: Error = df_err.into();

// Should get back the original lance error directly (not wrapped in External)
match recovered {
Error::InvalidInput { .. } => {
assert!(recovered.to_string().contains("test validation error"));
}
_ => panic!("Expected InvalidInput variant, got {:?}", recovered),
}
}
}
Loading
Loading