diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index b2ad36b42113..b419c37158dc 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -52,8 +52,7 @@ async fn test_default_read() { #[tokio::test] async fn test_async_cache_with_filters() { let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49); - let async_builder = test.async_builder().await; - let async_builder = test.add_project_ab_and_filter_b(async_builder); + let async_builder = test.async_builder().await.add_project_ab_and_filter_b(); test.run_async(async_builder).await; } @@ -63,8 +62,7 @@ async fn test_sync_cache_with_filters() { // The sync reader does not use the cache. See https://github.com/apache/arrow-rs/issues/8000 .with_expected_records_read_from_cache(0); - let sync_builder = test.sync_builder(); - let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + let sync_builder = test.sync_builder().add_project_ab_and_filter_b(); test.run_sync(sync_builder); } @@ -72,12 +70,17 @@ async fn test_sync_cache_with_filters() { async fn test_cache_disabled_with_filters() { // expect no records to be read from cache, because the cache is disabled let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); - let sync_builder = test.sync_builder().with_max_predicate_cache_size(0); - let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + let sync_builder = test + .sync_builder() + .with_max_predicate_cache_size(0) + .add_project_ab_and_filter_b(); test.run_sync(sync_builder); - let async_builder = test.async_builder().await.with_max_predicate_cache_size(0); - let async_builder = test.add_project_ab_and_filter_b(async_builder); + let async_builder = test + .async_builder() + .await + .with_max_predicate_cache_size(0) + .add_project_ab_and_filter_b(); test.run_async(async_builder).await; } @@ -85,12 +88,10 @@ async fn test_cache_disabled_with_filters() { async fn test_cache_projection_excludes_nested_columns() { let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0); - let sync_builder = test.sync_builder(); - let sync_builder = test.add_nested_filter(sync_builder); + let sync_builder = test.sync_builder().add_nested_filter(); test.run_sync(sync_builder); - let async_builder = test.async_builder().await; - let async_builder = test.add_nested_filter(async_builder); + let async_builder = test.async_builder().await.add_nested_filter(); test.run_async(async_builder).await; } @@ -154,53 +155,6 @@ impl ParquetPredicateCacheTest { .unwrap() } - /// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with - /// - /// 1. a projection selecting the "a" and "b" column - /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) - fn add_project_ab_and_filter_b( - &self, - builder: ArrowReaderBuilder, - ) -> ArrowReaderBuilder { - let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); - - // "b" > 575 and "b" < 625 - let row_filter = ArrowPredicateFn::new( - ProjectionMask::columns(&schema_descr, ["b"]), - |batch: RecordBatch| { - let scalar_575 = Int64Array::new_scalar(575); - let scalar_625 = Int64Array::new_scalar(625); - let column = batch.column(0).as_primitive::(); - and(>(column, &scalar_575)?, <(column, &scalar_625)?) - }, - ); - - builder - .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) - .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) - } - - /// Add a filter on the nested leaf nodes - fn add_nested_filter(&self, builder: ArrowReaderBuilder) -> ArrowReaderBuilder { - let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); - - // Build a RowFilter whose predicate projects a leaf under the nested root `b` - // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) - let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]); - - let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { - Ok(arrow_array::BooleanArray::from(vec![ - true; - batch.num_rows() - ])) - }); - let row_filter = RowFilter::new(vec![Box::new(always_true)]); - - builder - .with_projection(nested_leaf_mask) - .with_row_filter(row_filter) - } - /// Build the reader from the specified builder, reading all batches from it, /// and asserts the fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder) { @@ -322,6 +276,56 @@ static NESTED_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { Bytes::from(output) }); +trait ArrowReaderBuilderExt { + /// Applies the following: + /// 1. a projection selecting the "a" and "b" column + /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) + fn add_project_ab_and_filter_b(self) -> Self; + + /// Adds a row filter that projects the nested leaf column "b.aa" and + /// returns true for all rows. + fn add_nested_filter(self) -> Self; +} + +impl ArrowReaderBuilderExt for ArrowReaderBuilder { + fn add_project_ab_and_filter_b(self) -> Self { + let schema_descr = self.metadata().file_metadata().schema_descr_ptr(); + + // "b" > 575 and "b" < 625 + let row_filter = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + + self.with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) + } + + fn add_nested_filter(self) -> Self { + let schema_descr = self.metadata().file_metadata().schema_descr_ptr(); + + // Build a RowFilter whose predicate projects a leaf under the nested root `b` + // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) + let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]); + + let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { + Ok(arrow_array::BooleanArray::from(vec![ + true; + batch.num_rows() + ])) + }); + let row_filter = RowFilter::new(vec![Box::new(always_true)]); + + self.with_projection(nested_leaf_mask) + .with_row_filter(row_filter) + } +} + /// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮 /// TODO put this in a common place #[derive(Clone)]