diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ab5a96f751..0a8aa08064 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -54,8 +54,9 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; +use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -250,12 +251,20 @@ impl ArrowReader { initial_stream_builder }; + // Filter out metadata fields for Parquet projection (they don't exist in files) + let project_field_ids_without_metadata: Vec = task + .project_field_ids + .iter() + .filter(|&&id| !is_metadata_field(id)) + .copied() + .collect(); + // Create projection mask based on field IDs // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) // - If fallback IDs: position-based projection (missing_field_ids=true) let projection_mask = Self::get_arrow_projection_mask( - &task.project_field_ids, + &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -266,16 +275,20 @@ impl ArrowReader { record_batch_stream_builder.with_projection(projection_mask.clone()); // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering. + // that come back from the file, such as type promotion, default column insertion, + // column re-ordering, partition constants, and virtual field addition (like _file) let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()) + .with_constant( + RESERVED_FIELD_ID_FILE, + PrimitiveLiteral::String(task.data_file_path.clone()), + )?; if let (Some(partition_spec), Some(partition_data)) = (task.partition_spec.clone(), task.partition.clone()) { record_batch_transformer_builder = - record_batch_transformer_builder.with_partition(partition_spec, partition_data); + record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } let mut record_batch_transformer = record_batch_transformer_builder.build(); @@ -416,7 +429,10 @@ impl ArrowReader { record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), + Ok(batch) => { + // Process the record batch (type promotion, column reordering, virtual fields, etc.) + record_batch_transformer.process_record_batch(batch) + } Err(err) => Err(err.into()), }); @@ -1737,7 +1753,7 @@ mod tests { use std::sync::Arc; use arrow_array::cast::AsArray; - use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; + use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, RunArray, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; @@ -2553,14 +2569,24 @@ message schema { .as_primitive::(); assert_eq!(col_a.values(), &[1, 2, 3]); - // Column 'b' should be all NULLs (it didn't exist in the old file) - let col_b = batch - .column(1) - .as_primitive::(); - assert_eq!(col_b.null_count(), 3); - assert!(col_b.is_null(0)); - assert!(col_b.is_null(1)); - assert!(col_b.is_null(2)); + // Column 'b' should be all NULLs (it didn't exist in the old file, added with REE) + let col_b = batch.column(1); + // For REE array with null, check the values array + if let Some(run_array) = col_b + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + assert!( + values.is_null(0), + "REE values should contain null for added column" + ); + } else { + // Fallback to direct null check for simple arrays + assert!(col_b.is_null(0)); + assert!(col_b.is_null(1)); + assert!(col_b.is_null(2)); + } } /// Test for bug where position deletes in later row groups are not applied correctly. @@ -3440,11 +3466,23 @@ message schema { assert_eq!(age_array.value(0), 30); assert_eq!(age_array.value(1), 25); - // Verify missing column filled with NULLs - let city_array = batch.column(2).as_string::(); - assert_eq!(city_array.null_count(), 2); - assert!(city_array.is_null(0)); - assert!(city_array.is_null(1)); + // Verify missing column filled with NULLs (will be REE with null) + let city_col = batch.column(2); + if let Some(run_array) = city_col + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + assert!( + values.is_null(0), + "REE values should contain null for added column" + ); + } else { + let city_array = city_col.as_string::(); + assert_eq!(city_array.null_count(), 2); + assert!(city_array.is_null(0)); + assert!(city_array.is_null(1)); + } } /// Test reading Parquet files without field IDs that have multiple row groups. @@ -3761,13 +3799,23 @@ message schema { assert_eq!(result_col0.value(0), 1); assert_eq!(result_col0.value(1), 2); - // New column should be NULL (doesn't exist in old file) - let result_newcol = batch - .column(1) - .as_primitive::(); - assert_eq!(result_newcol.null_count(), 2); - assert!(result_newcol.is_null(0)); - assert!(result_newcol.is_null(1)); + // New column should be NULL (doesn't exist in old file, added with REE) + let newcol = batch.column(1); + if let Some(run_array) = newcol + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + assert!( + values.is_null(0), + "REE values should contain null for added column" + ); + } else { + let result_newcol = newcol.as_primitive::(); + assert_eq!(result_newcol.null_count(), 2); + assert!(result_newcol.is_null(0)); + assert!(result_newcol.is_null(1)); + } let result_col1 = batch .column(2) diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 07ec43918f..d0cd4c25d4 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -20,17 +20,16 @@ use std::sync::Arc; use arrow_array::{ Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, - Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, - StructArray, + Float64Array, Int32Array, Int64Array, RecordBatch, RecordBatchOptions, RunArray, StringArray, }; -use arrow_buffer::NullBuffer; use arrow_cast::cast; use arrow_schema::{ - DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, + DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; +use crate::metadata_columns::get_metadata_field; use crate::spec::{ Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, }; @@ -146,13 +145,13 @@ enum SchemaComparison { /// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters. /// -/// See [`RecordBatchTransformer`] for details on partition spec and partition data. +/// Constant fields are pre-computed for both virtual/metadata fields (like _file) and +/// identity-partitioned fields to avoid duplicate work during batch processing. #[derive(Debug)] pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - partition_spec: Option>, - partition_data: Option, + constant_fields: HashMap, } impl RecordBatchTransformerBuilder { @@ -163,32 +162,49 @@ impl RecordBatchTransformerBuilder { Self { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), - partition_spec: None, - partition_data: None, + constant_fields: HashMap::new(), } } + /// Add a constant value for a specific field ID. + /// This is used for virtual/metadata fields like _file that have constant values per batch. + /// + /// # Arguments + /// * `field_id` - The field ID to associate with the constant + /// * `value` - The constant value for this field + pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + Ok(self) + } + /// Set partition spec and data together for identifying identity-transformed partition columns. /// /// Both partition_spec and partition_data must be provided together since the spec defines /// which fields are identity-partitioned, and the data provides their constant values. - /// One without the other cannot produce a valid constants map. + /// This method computes the partition constants and merges them into constant_fields. pub(crate) fn with_partition( mut self, partition_spec: Arc, partition_data: Struct, - ) -> Self { - self.partition_spec = Some(partition_spec); - self.partition_data = Some(partition_data); - self + ) -> Result { + // Compute partition constants for identity-transformed fields + let partition_constants = constants_map(&partition_spec, &partition_data); + + // Add partition constants to constant_fields (compute REE types from literals) + for (field_id, value) in partition_constants { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + } + + Ok(self) } pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, - partition_spec: self.partition_spec, - partition_data: self.partition_data, + constant_fields: self.constant_fields, batch_transform: None, } } @@ -228,16 +244,10 @@ impl RecordBatchTransformerBuilder { pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - - /// Partition spec for identifying identity-transformed partition columns (spec rule #1). - /// Only fields with identity transforms use partition data constants; non-identity transforms - /// (bucket, truncate, etc.) must read source columns from data files. - partition_spec: Option>, - - /// Partition data providing constant values for identity-transformed partition columns (spec rule #1). - /// For example, in a file at path `dept=engineering/file.parquet`, this would contain - /// the value "engineering" for the dept field. - partition_data: Option, + // Pre-computed constant field information: field_id -> (arrow_type, value) + // Includes both virtual/metadata fields (like _file) and identity-partitioned fields + // Avoids type conversions during batch processing + constant_fields: HashMap, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -279,8 +289,7 @@ impl RecordBatchTransformer { record_batch.schema_ref(), self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, - self.partition_spec.as_ref().map(|s| s.as_ref()), - self.partition_data.as_ref(), + &self.constant_fields, )?); self.process_record_batch(record_batch)? @@ -299,34 +308,69 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - partition_spec: Option<&PartitionSpec>, - partition_data: Option<&Struct>, + constant_fields: &HashMap, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?; + let field_id_to_source_schema_map = + Self::build_field_id_to_arrow_schema_map(source_schema)?; // Create a new arrow schema by selecting fields from mapped_unprojected, // in the order of the field ids in projected_iceberg_field_ids let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { - Ok(field_id_to_mapped_schema_map - .get(field_id) - .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? - .0 - .clone()) + // Check if this is a constant field (virtual or partition) + if constant_fields.contains_key(field_id) { + // For metadata/virtual fields (like _file), get name from metadata_columns + // For partition fields, get name from schema (they exist in schema) + if let Ok(field) = get_metadata_field(*field_id) { + // This is a metadata/virtual field - use the predefined field + Ok(field) + } else { + // This is a partition constant field (exists in schema but uses constant value) + let field = &field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0; + let (arrow_type, _) = constant_fields.get(field_id).unwrap(); + // Use the REE type from constant_fields + let ree_field = + Field::new(field.name(), arrow_type.clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()); + Ok(Arc::new(ree_field)) + } + } else { + // Get field from schema + let field = &field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0; + + // Check if this field exists in the source - if not, it will be added with REE + if !field_id_to_source_schema_map.contains_key(field_id) { + // Field will be added - use REE type for the schema + let run_ends_field = + Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = + Arc::new(Field::new("values", field.data_type().clone(), true)); + let ree_field = Field::new( + field.name(), + DataType::RunEndEncoded(run_ends_field, values_field), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()); + Ok(Arc::new(ree_field)) + } else { + Ok(Arc::clone(field)) + } + } }) .collect(); let target_schema = Arc::new(ArrowSchema::new(fields?)); - let constants_map = if let (Some(spec), Some(data)) = (partition_spec, partition_data) { - constants_map(spec, data) - } else { - HashMap::new() - }; - match Self::compare_schemas(source_schema, &target_schema) { SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), @@ -336,8 +380,7 @@ impl RecordBatchTransformer { snapshot_schema, projected_iceberg_field_ids, field_id_to_mapped_schema_map, - constants_map, - partition_spec, + constant_fields, )?, target_schema, }), @@ -394,8 +437,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, - constants_map: HashMap, - _partition_spec: Option<&PartitionSpec>, + constant_fields: &HashMap, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -403,6 +445,17 @@ impl RecordBatchTransformer { projected_iceberg_field_ids .iter() .map(|field_id| { + // Check if this is a constant field (metadata/virtual or identity-partitioned) + // Constant fields always use their pre-computed constant values, regardless of whether + // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata + // is authoritative and should be preferred over file data. + if let Some((arrow_type, value)) = constant_fields.get(field_id) { + return Ok(ColumnSource::Add { + value: Some(value.clone()), + target_type: arrow_type.clone(), + }); + } + let (target_field, _) = field_id_to_mapped_schema_map .get(field_id) @@ -451,13 +504,8 @@ impl RecordBatchTransformer { ); // Apply spec's fallback steps for "not present" fields. - let column_source = if let Some(constant_value) = constants_map.get(field_id) { - // Rule #1: Identity partition constant - ColumnSource::Add { - value: Some(constant_value.clone()), - target_type: target_type.clone(), - } - } else if let Some(source) = field_by_id { + // Rule #1 (constants) is handled at the beginning of this function + let column_source = if let Some(source) = field_by_id { source } else { // Rules #2, #3 and #4: @@ -471,9 +519,15 @@ impl RecordBatchTransformer { None } }); + + // Always use REE for added columns (memory efficient) + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", target_type.clone(), true)); + let column_type = DataType::RunEndEncoded(run_ends_field, values_field); + ColumnSource::Add { value: default_value, - target_type: target_type.clone(), + target_type: column_type, } }; @@ -539,83 +593,164 @@ impl RecordBatchTransformer { prim_lit: &Option, num_rows: usize, ) -> Result { - Ok(match (target_type, prim_lit) { - (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { - Arc::new(BooleanArray::from(vec![*value; num_rows])) - } - (DataType::Boolean, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BooleanArray::from(vals)) - } - (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Int32Array::from(vec![*value; num_rows])) - } - (DataType::Int32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int32Array::from(vals)) - } - (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Date32Array::from(vec![*value; num_rows])) + // All added columns use Run-End Encoding for memory efficiency + let DataType::RunEndEncoded(_, values_field) = target_type else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Expected RunEndEncoded type for added column, got: {}", + target_type + ), + )); + }; + + // Helper to create a Run-End Encoded array + let create_ree_array = |values_array: ArrayRef| -> Result { + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + Ok(Arc::new( + RunArray::try_new(&run_ends, &values_array).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for constant value", + ) + .with_source(e) + })?, + )) + }; + + // Create the values array based on the literal value + let values_array: ArrayRef = match (values_field.data_type(), prim_lit) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => { + Arc::new(BooleanArray::from(vec![*v])) } - (DataType::Date32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Date32Array::from(vals)) + (DataType::Boolean, None) => Arc::new(BooleanArray::from(vec![Option::::None])), + (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => { + Arc::new(Int32Array::from(vec![*v])) } - (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { - Arc::new(Int64Array::from(vec![*value; num_rows])) + (DataType::Int32, None) => Arc::new(Int32Array::from(vec![Option::::None])), + (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => { + Arc::new(Date32Array::from(vec![*v])) } - (DataType::Int64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int64Array::from(vals)) + (DataType::Date32, None) => Arc::new(Date32Array::from(vec![Option::::None])), + (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => { + Arc::new(Int64Array::from(vec![*v])) } - (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { - Arc::new(Float32Array::from(vec![value.0; num_rows])) + (DataType::Int64, None) => Arc::new(Int64Array::from(vec![Option::::None])), + (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { + Arc::new(Float32Array::from(vec![v.0])) } - (DataType::Float32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float32Array::from(vals)) + (DataType::Float32, None) => Arc::new(Float32Array::from(vec![Option::::None])), + (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => { + Arc::new(Float64Array::from(vec![v.0])) } - (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { - Arc::new(Float64Array::from(vec![value.0; num_rows])) + (DataType::Float64, None) => Arc::new(Float64Array::from(vec![Option::::None])), + (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => { + Arc::new(StringArray::from(vec![v.as_str()])) } - (DataType::Float64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float64Array::from(vals)) + (DataType::Utf8, None) => Arc::new(StringArray::from(vec![Option::<&str>::None])), + (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => { + Arc::new(BinaryArray::from_vec(vec![v.as_slice()])) } - (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { - Arc::new(StringArray::from(vec![value.clone(); num_rows])) + (DataType::Binary, None) => { + Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) } - (DataType::Utf8, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(StringArray::from(vals)) + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(v))) => { + Arc::new(arrow_array::Decimal128Array::from(vec![{ *v }])) } - (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { - Arc::new(BinaryArray::from_vec(vec![value; num_rows])) + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(v))) => { + Arc::new(arrow_array::Decimal128Array::from(vec![*v as i128])) } - (DataType::Binary, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BinaryArray::from_opt_vec(vals)) + (DataType::Decimal128(_, _), None) => { + Arc::new(arrow_array::Decimal128Array::from(vec![ + Option::::None, + ])) } (DataType::Struct(fields), None) => { - // Create a StructArray filled with nulls. Per Iceberg spec, optional struct fields - // default to null when added to the schema. We defer non-null default struct values - // and leave them as not implemented yet. + // Create a single-element StructArray with nulls let null_arrays: Vec = fields .iter() - .map(|field| Self::create_column(field.data_type(), &None, num_rows)) - .collect::>>()?; - - Arc::new(StructArray::new( + .map(|f| { + // Recursively create null arrays for struct fields + // For primitive fields in structs, use simple null arrays (not REE within struct) + match f.data_type() { + DataType::Boolean => { + Arc::new(BooleanArray::from(vec![Option::::None])) as ArrayRef + } + DataType::Int32 | DataType::Date32 => { + Arc::new(Int32Array::from(vec![Option::::None])) + } + DataType::Int64 => { + Arc::new(Int64Array::from(vec![Option::::None])) + } + DataType::Float32 => { + Arc::new(Float32Array::from(vec![Option::::None])) + } + DataType::Float64 => { + Arc::new(Float64Array::from(vec![Option::::None])) + } + DataType::Utf8 => { + Arc::new(StringArray::from(vec![Option::<&str>::None])) + } + DataType::Binary => { + Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) + } + _ => panic!("Unsupported struct field type: {:?}", f.data_type()), + } + }) + .collect(); + Arc::new(arrow_array::StructArray::new( fields.clone(), null_arrays, - Some(NullBuffer::new_null(num_rows)), + Some(arrow_buffer::NullBuffer::new_null(1)), )) } - (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), - (dt, _) => { + _ => { return Err(Error::new( ErrorKind::Unexpected, - format!("unexpected target column type {}", dt), + format!( + "Unsupported constant type combination: {:?} with {:?}", + values_field.data_type(), + prim_lit + ), + )); + } + }; + + // Wrap in Run-End Encoding + create_ree_array(values_array) + } + + /// Converts a PrimitiveLiteral to its corresponding Arrow DataType. + /// This is used for constant fields to determine the Arrow type. + /// For constant values, we use Run-End Encoding for all types to save memory. + fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result { + // Helper to create REE type with the given values type + // Note: values field is nullable as Arrow expects this when building the + // final Arrow schema with `RunArray::try_new`. + let make_ree = |values_type: DataType| -> DataType { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", values_type, true)); + DataType::RunEndEncoded(run_ends_field, values_field) + }; + + Ok(match literal { + PrimitiveLiteral::Boolean(_) => make_ree(DataType::Boolean), + PrimitiveLiteral::Int(_) => make_ree(DataType::Int32), + PrimitiveLiteral::Long(_) => make_ree(DataType::Int64), + PrimitiveLiteral::Float(_) => make_ree(DataType::Float32), + PrimitiveLiteral::Double(_) => make_ree(DataType::Float64), + PrimitiveLiteral::String(_) => make_ree(DataType::Utf8), + PrimitiveLiteral::Binary(_) => make_ree(DataType::Binary), + PrimitiveLiteral::Int128(_) => make_ree(DataType::Decimal128(38, 0)), + PrimitiveLiteral::UInt128(_) => make_ree(DataType::Decimal128(38, 0)), + PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot create arrow type for AboveMax/BelowMin literal", )); } }) @@ -628,7 +763,7 @@ mod test { use std::sync::Arc; use arrow_array::{ - Array, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, + Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, }; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; @@ -639,6 +774,82 @@ mod test { }; use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; + /// Helper to extract string values from either StringArray or RunEndEncoded + /// Returns empty string for null values + fn get_string_value(array: &dyn Array, index: usize) -> String { + if let Some(string_array) = array.as_any().downcast_ref::() { + if string_array.is_null(index) { + String::new() + } else { + string_array.value(index).to_string() + } + } else if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let string_values = values + .as_any() + .downcast_ref::() + .expect("REE values should be StringArray"); + // For REE, all rows have the same value (index 0 in the values array) + if string_values.is_null(0) { + String::new() + } else { + string_values.value(0).to_string() + } + } else { + panic!("Expected StringArray or RunEndEncoded"); + } + } + + /// Helper to extract int values from either Int32Array or RunEndEncoded + fn get_int_value(array: &dyn Array, index: usize) -> i32 { + if let Some(int_array) = array.as_any().downcast_ref::() { + int_array.value(index) + } else if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let int_values = values + .as_any() + .downcast_ref::() + .expect("REE values should be Int32Array"); + int_values.value(0) + } else { + panic!("Expected Int32Array or RunEndEncoded"); + } + } + + /// Helper to check if value is null in either simple or REE array + fn is_null_value(array: &dyn Array, index: usize) -> bool { + if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + values.is_null(0) // For REE, check the single value + } else { + array.is_null(index) + } + } + + /// Helper to create a RunEndEncoded StringArray for constant values + fn create_ree_string_array(value: Option<&str>, num_rows: usize) -> ArrayRef { + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + let values = if num_rows == 0 { + StringArray::from(Vec::>::new()) + } else { + StringArray::from(vec![value]) + }; + Arc::new(arrow_array::RunArray::try_new(&run_ends, &values).unwrap()) + } + #[test] fn build_field_id_to_source_schema_map_works() { let arrow_schema = arrow_schema_already_same_as_target(); @@ -734,30 +945,19 @@ mod test { assert_eq!(result.num_columns(), 3); assert_eq!(result.num_rows(), 3); - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.values(), &[1, 2, 3]); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 1); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 2); + assert_eq!(get_int_value(result.column(0).as_ref(), 2), 3); - let name_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(name_column.value(0), "Alice"); - assert_eq!(name_column.value(1), "Bob"); - assert_eq!(name_column.value(2), "Charlie"); + assert_eq!(get_string_value(result.column(1).as_ref(), 0), "Alice"); + assert_eq!(get_string_value(result.column(1).as_ref(), 1), "Bob"); + assert_eq!(get_string_value(result.column(1).as_ref(), 2), "Charlie"); - let date_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(date_column.is_null(0)); - assert!(date_column.is_null(1)); - assert!(date_column.is_null(2)); + // date_col added with null default - will be REE with null + assert!(is_null_value(result.column(2).as_ref(), 0)); + assert!(is_null_value(result.column(2).as_ref(), 1)); + assert!(is_null_value(result.column(2).as_ref(), 2)); } #[test] @@ -792,7 +992,8 @@ mod test { let projected_iceberg_field_ids = [1, 2, 3]; let mut transformer = - RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); let file_schema = Arc::new(ArrowSchema::new(vec![ simple_field("id", DataType::Int32, false, "1"), @@ -810,30 +1011,19 @@ mod test { assert_eq!(result.num_columns(), 3); assert_eq!(result.num_rows(), 3); - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.values(), &[1, 2, 3]); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 1); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 2); + assert_eq!(get_int_value(result.column(0).as_ref(), 2), 3); - let data_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(data_column.value(0), "a"); - assert_eq!(data_column.value(1), "b"); - assert_eq!(data_column.value(2), "c"); + assert_eq!(get_string_value(result.column(1).as_ref(), 0), "a"); + assert_eq!(get_string_value(result.column(1).as_ref(), 1), "b"); + assert_eq!(get_string_value(result.column(1).as_ref(), 2), "c"); - let struct_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(struct_column.is_null(0)); - assert!(struct_column.is_null(1)); - assert!(struct_column.is_null(2)); + // Struct column added with null - will be REE + assert!(is_null_value(result.column(2).as_ref(), 0)); + assert!(is_null_value(result.column(2).as_ref(), 1)); + assert!(is_null_value(result.column(2).as_ref(), 2)); } pub fn source_record_batch() -> RecordBatch { @@ -873,10 +1063,17 @@ mod test { } pub fn expected_record_batch_migration_required() -> RecordBatch { - RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![ - Arc::new(StringArray::from(Vec::>::from([ - None, None, None, - ]))), // a + // Build schema with REE type for added fields (a and f) + let schema = Arc::new(ArrowSchema::new(vec![ + ree_field("a", DataType::Utf8, true, "10"), // Added field - REE with null + simple_field("b", DataType::Int64, false, "11"), + simple_field("c", DataType::Float64, false, "12"), + simple_field("e", DataType::Utf8, true, "14"), + ree_field("f", DataType::Utf8, false, "15"), // Added field - REE with default + ])); + + RecordBatch::try_new(schema, vec![ + create_ree_string_array(None, 3), // a - REE with null (not in source) Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b Arc::new(Float64Array::from(vec![ Some(12.125), @@ -888,11 +1085,7 @@ mod test { Some("Iceberg"), Some("Rocks"), ])), // e (d skipped by projection) - Arc::new(StringArray::from(vec![ - Some("(╯°□°)╯"), - Some("(╯°□°)╯"), - Some("(╯°□°)╯"), - ])), // f + create_ree_string_array(Some("(╯°□°)╯"), 3), // f - REE for constant default ]) .unwrap() } @@ -948,6 +1141,21 @@ mod test { )])) } + /// Helper to create a Field with RunEndEncoded type for constant columns + fn ree_field(name: &str, values_type: DataType, nullable: bool, field_id: &str) -> Field { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", values_type, true)); + Field::new( + name, + DataType::RunEndEncoded(run_ends_field, values_field), + nullable, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + )])) + } + /// Test for add_files with Parquet files that have NO field IDs (Hive tables). /// /// This reproduces the scenario from Iceberg spec where: @@ -1027,33 +1235,14 @@ mod test { assert_eq!(result.num_columns(), 4); assert_eq!(result.num_rows(), 1); - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 1); - - let name_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(name_column.value(0), "John Doe"); - - let dept_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(dept_column.value(0), "hr"); - - let subdept_column = result - .column(3) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(subdept_column.value(0), "communications"); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 1); // id from initial_default - REE + assert_eq!(get_string_value(result.column(1).as_ref(), 0), "John Doe"); // name from Parquet + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "hr"); // dept from initial_default - REE + assert_eq!( + get_string_value(result.column(3).as_ref(), 0), + "communications" + ); // subdept from Parquet } /// Test for bucket partitioning where source columns must be read from data files. @@ -1136,6 +1325,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1256,6 +1446,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1270,30 +1461,23 @@ mod test { assert_eq!(result.num_columns(), 3); assert_eq!(result.num_rows(), 2); - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 100); - assert_eq!(id_column.value(1), 200); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200); - let dept_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - // This value MUST come from partition metadata (constant) - assert_eq!(dept_column.value(0), "engineering"); - assert_eq!(dept_column.value(1), "engineering"); + // dept column comes from partition metadata (constant) - will be REE + assert_eq!( + get_string_value(result.column(1).as_ref(), 0), + "engineering" + ); + assert_eq!( + get_string_value(result.column(1).as_ref(), 1), + "engineering" + ); - let name_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(name_column.value(0), "Alice"); - assert_eq!(name_column.value(1), "Bob"); + // name column comes from file + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice"); + assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob"); } /// Test bucket partitioning with renamed source column. @@ -1371,6 +1555,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1475,6 +1660,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1491,48 +1677,37 @@ mod test { // Verify each column demonstrates the correct spec rule: // Normal case: id from Parquet by field ID - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 100); - assert_eq!(id_column.value(1), 200); - - // Rule #1: dept from partition metadata (identity transform) - let dept_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(dept_column.value(0), "engineering"); - assert_eq!(dept_column.value(1), "engineering"); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200); + + // Rule #1: dept from partition metadata (identity transform) - will be REE + assert_eq!( + get_string_value(result.column(1).as_ref(), 0), + "engineering" + ); + assert_eq!( + get_string_value(result.column(1).as_ref(), 1), + "engineering" + ); - // Rule #2: data from Parquet via name mapping - let data_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(data_column.value(0), "value1"); - assert_eq!(data_column.value(1), "value2"); + // Rule #2: data from Parquet via name mapping - will be regular array + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1"); + assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2"); - // Rule #3: category from initial_default - let category_column = result - .column(3) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(category_column.value(0), "default_category"); - assert_eq!(category_column.value(1), "default_category"); + // Rule #3: category from initial_default - will be REE + assert_eq!( + get_string_value(result.column(3).as_ref(), 0), + "default_category" + ); + assert_eq!( + get_string_value(result.column(3).as_ref(), 1), + "default_category" + ); - // Rule #4: notes is null (no default, not in Parquet, not in partition) - let notes_column = result - .column(4) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(notes_column.is_null(0)); - assert!(notes_column.is_null(1)); + // Rule #4: notes is null (no default, not in Parquet, not in partition) - will be REE with null + // For null REE arrays, we still use the helper which handles extraction + assert_eq!(get_string_value(result.column(4).as_ref(), 0), ""); + assert_eq!(get_string_value(result.column(4).as_ref(), 1), ""); } } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..8d8f40f72d 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -96,4 +96,5 @@ mod utils; pub mod writer; mod delete_vector; +pub mod metadata_columns; pub mod puffin; diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs new file mode 100644 index 0000000000..ce48651c17 --- /dev/null +++ b/crates/iceberg/src/metadata_columns.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata columns (virtual/reserved fields) for Iceberg tables. +//! +//! This module defines metadata columns that can be requested in projections +//! but are not stored in data files. Instead, they are computed on-the-fly +//! during reading. Examples include the _file column (file path) and future +//! columns like partition values or row numbers. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_schema::{DataType, Field}; +use once_cell::sync::Lazy; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::{Error, ErrorKind, Result}; + +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1; + +/// Reserved column name for the file path metadata column +pub const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Lazy-initialized Arrow Field definition for the _file metadata column. +/// Uses Run-End Encoding for memory efficiency. +static FILE_PATH_FIELD: Lazy> = Lazy::new(|| { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + Arc::new( + Field::new( + RESERVED_COL_NAME_FILE, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_FILE.to_string(), + )])), + ) +}); + +/// Returns the Arrow Field definition for the _file metadata column. +/// +/// # Returns +/// A reference to the _file field definition (RunEndEncoded type) +pub fn file_path_field() -> &'static Arc { + &FILE_PATH_FIELD +} + +/// Returns the Arrow Field definition for a metadata field ID. +/// +/// # Arguments +/// * `field_id` - The metadata field ID +/// +/// # Returns +/// The Arrow Field definition for the metadata column, or an error if not a metadata field +pub fn get_metadata_field(field_id: i32) -> Result> { + match field_id { + RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_path_field())), + _ if is_metadata_field(field_id) => { + // Future metadata fields can be added here + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Metadata field ID {} recognized but field definition not implemented", + field_id + ), + )) + } + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Field ID {} is not a metadata field", field_id), + )), + } +} + +/// Returns the field ID for a metadata column name. +/// +/// # Arguments +/// * `column_name` - The metadata column name +/// +/// # Returns +/// The field ID of the metadata column, or an error if the column name is not recognized +pub fn get_metadata_field_id(column_name: &str) -> Result { + match column_name { + RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Unknown/unsupported metadata column name: {column_name}"), + )), + } +} + +/// Checks if a field ID is a metadata field. +/// +/// # Arguments +/// * `field_id` - The field ID to check +/// +/// # Returns +/// `true` if the field ID is a (currently supported) metadata field, `false` otherwise +pub fn is_metadata_field(field_id: i32) -> bool { + field_id == RESERVED_FIELD_ID_FILE + // Additional metadata fields can be checked here in the future +} + +/// Checks if a column name is a metadata column. +/// +/// # Arguments +/// * `column_name` - The column name to check +/// +/// # Returns +/// `true` if the column name is a metadata column, `false` otherwise +pub fn is_metadata_column_name(column_name: &str) -> bool { + get_metadata_field_id(column_name).is_ok() +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3e319ca062..df332a8dd2 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -36,6 +36,7 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; +use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; @@ -124,6 +125,47 @@ impl<'a> TableScanBuilder<'a> { self } + /// Include the _file metadata column in the scan. + /// + /// This is a convenience method that adds the _file column to the current selection. + /// If no columns are currently selected (select_all), this will select all columns plus _file. + /// If specific columns are selected, this adds _file to that selection. + /// + /// # Example + /// ```no_run + /// # use iceberg::table::Table; + /// # async fn example(table: Table) -> iceberg::Result<()> { + /// // Select id, name, and _file + /// let scan = table + /// .scan() + /// .select(["id", "name"]) + /// .with_file_path_column() + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_file_path_column(mut self) -> Self { + use crate::metadata_columns::RESERVED_COL_NAME_FILE; + + let mut columns = self.column_names.unwrap_or_else(|| { + // No explicit selection - get all column names from schema + self.table + .metadata() + .current_schema() + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + // Add _file column + columns.push(RESERVED_COL_NAME_FILE.to_string()); + + self.column_names = Some(columns); + self + } + /// Set the snapshot to scan. When not set, it uses current snapshot. pub fn snapshot_id(mut self, snapshot_id: i64) -> Self { self.snapshot_id = Some(snapshot_id); @@ -217,9 +259,13 @@ impl<'a> TableScanBuilder<'a> { let schema = snapshot.schema(self.table.metadata())?; - // Check that all column names exist in the schema. + // Check that all column names exist in the schema (skip reserved columns). if let Some(column_names) = self.column_names.as_ref() { for column_name in column_names { + // Skip reserved columns that don't exist in the schema + if is_metadata_column_name(column_name) { + continue; + } if schema.field_by_name(column_name).is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -240,6 +286,12 @@ impl<'a> TableScanBuilder<'a> { }); for column_name in column_names.iter() { + // Handle metadata columns (like "_file") + if is_metadata_column_name(column_name) { + field_ids.push(get_metadata_field_id(column_name)?); + continue; + } + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -254,10 +306,10 @@ impl<'a> TableScanBuilder<'a> { Error::new( ErrorKind::FeatureUnsupported, format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; + "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" + ), + ) + })?; field_ids.push(field_id); } @@ -559,8 +611,10 @@ pub mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use futures::{TryStreamExt, stream}; use minijinja::value::Value; @@ -575,6 +629,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; + use crate::metadata_columns::RESERVED_COL_NAME_FILE; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -1800,4 +1855,319 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_select_with_file_column() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select regular columns plus the _file column + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _file + assert_eq!(batches[0].num_columns(), 2); + + // Verify the x column exists and has correct data + let x_col = batches[0].column_by_name("x").unwrap(); + let x_arr = x_col.as_primitive::(); + assert_eq!(x_arr.value(0), 1); + + // Verify the _file column exists + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE); + assert!( + file_col.is_some(), + "_file column should be present in the batch" + ); + + // Verify the _file column contains a file path + let file_col = file_col.unwrap(); + assert!( + matches!( + file_col.data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + + // Decode the RunArray to verify it contains the file path + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have a single file path"); + + let file_path = string_values.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet, got: {}", + file_path + ); + } + + #[tokio::test] + async fn test_select_file_column_position() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select columns in specific order: x, _file, z + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE, "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify column order: x at position 0, _file at position 1, z at position 2 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(2).name(), "z"); + + // Verify columns by name also works + assert!(batches[0].column_by_name("x").is_some()); + assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some()); + assert!(batches[0].column_by_name("z").is_some()); + } + + #[tokio::test] + async fn test_select_file_column_only() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select only the _file column + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Should have exactly 1 column + assert_eq!(batches[0].num_columns(), 1); + + // Verify it's the _file column + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + + // Verify the batch has the correct number of rows + // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted) + // Each file has 1024 rows, so total is 2048 rows + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2048); + } + + #[tokio::test] + async fn test_file_column_with_multiple_files() { + use std::collections::HashSet; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select x and _file columns + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Collect all unique file paths from the batches + let mut file_paths = HashSet::new(); + for batch in &batches { + let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + for i in 0..string_values.len() { + file_paths.insert(string_values.value(i).to_string()); + } + } + + // We should have multiple files (the test creates 1.parquet and 3.parquet) + assert!(!file_paths.is_empty(), "Should have at least one file path"); + + // All paths should end with .parquet + for path in &file_paths { + assert!( + path.ends_with(".parquet"), + "All file paths should end with .parquet, got: {}", + path + ); + } + } + + #[tokio::test] + async fn test_file_column_at_start() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the start + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE, "x", "y"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 0 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(1).name(), "x"); + assert_eq!(schema.field(2).name(), "y"); + } + + #[tokio::test] + async fn test_file_column_at_end() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the end + let table_scan = fixture + .table + .scan() + .select(["x", "y", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 2 (the end) + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), "y"); + assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); + } + + #[tokio::test] + async fn test_select_with_repeated_column_names() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select with repeated column names - both regular columns and virtual columns + // Repeated columns should appear multiple times in the result (duplicates are allowed) + let table_scan = fixture + .table + .scan() + .select([ + "x", + RESERVED_COL_NAME_FILE, + "x", // x repeated + "y", + RESERVED_COL_NAME_FILE, // _file repeated + "y", // y repeated + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have exactly 6 columns (duplicates are allowed and preserved) + assert_eq!( + batches[0].num_columns(), + 6, + "Should have exactly 6 columns with duplicates" + ); + + let schema = batches[0].schema(); + + // Verify columns appear in the exact order requested: x, _file, x, y, _file, y + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_FILE, + "Column 1 should be _file" + ); + assert_eq!( + schema.field(2).name(), + "x", + "Column 2 should be x (duplicate)" + ); + assert_eq!(schema.field(3).name(), "y", "Column 3 should be y"); + assert_eq!( + schema.field(4).name(), + RESERVED_COL_NAME_FILE, + "Column 4 should be _file (duplicate)" + ); + assert_eq!( + schema.field(5).name(), + "y", + "Column 5 should be y (duplicate)" + ); + + // Verify all columns have correct data types + assert!( + matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64), + "Column x should be Int64" + ); + assert!( + matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64), + "Column x (duplicate) should be Int64" + ); + assert!( + matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64), + "Column y should be Int64" + ); + assert!( + matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64), + "Column y (duplicate) should be Int64" + ); + assert!( + matches!( + schema.field(1).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + assert!( + matches!( + schema.field(4).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column (duplicate) should use RunEndEncoded type" + ); + } }