diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e41515d613c9..4eae2045994d 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -99,6 +99,13 @@ pub mod statistics; /// [`StatisticsConverter`]: statistics::StatisticsConverter /// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/ pub struct ArrowReaderBuilder { + /// The "input" to read parquet data from. + /// + /// Note in the case of the [`ParquetPushDecoderBuilder`], there + /// is no underlying input, which is indicated by a type parameter of [`NoInput`] + /// + /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder + /// [`NoInput`]: crate::arrow::push_decoder::NoInput pub(crate) input: T, pub(crate) metadata: Arc, diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c5badea7f32c..44c5465202e7 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -21,26 +21,23 @@ //! //! See example on [`ParquetRecordBatchStreamBuilder::new`] -use std::collections::VecDeque; use std::fmt::Formatter; use std::io::SeekFrom; use std::ops::Range; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; -use futures::ready; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Fields, Schema, SchemaRef}; +use arrow_schema::{Schema, SchemaRef}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, - RowFilter, RowSelection, }; use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; @@ -56,12 +53,8 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; -use crate::arrow::ProjectionMask; -use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache}; -use crate::arrow::arrow_reader::ReadPlanBuilder; -use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup}; -use crate::arrow::schema::ParquetField; +use crate::DecodeResult; +use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder}; #[cfg(feature = "object_store")] pub use store::*; @@ -201,10 +194,10 @@ impl ArrowReaderMetadata { } #[doc(hidden)] -// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers from async -// -// Allows sharing the same builder for both the sync and async versions, whilst also not -// breaking the pre-existing ParquetRecordBatchStreamBuilder API +/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync readers from async +/// +/// Allows sharing the same builder for different readers while keeping the same +/// ParquetRecordBatchStreamBuilder API pub struct AsyncReader(T); /// A builder for reading parquet files from an `async` source as [`ParquetRecordBatchStream`] @@ -483,300 +476,112 @@ impl ParquetRecordBatchStreamBuilder { /// /// See examples on [`ParquetRecordBatchStreamBuilder::new`] pub fn build(self) -> Result> { - let num_row_groups = self.metadata.row_groups().len(); - - let row_groups = match self.row_groups { - Some(row_groups) => { - if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) { - return Err(general_err!( - "row group {} out of bounds 0..{}", - col, - num_row_groups - )); - } - row_groups.into() - } - None => (0..self.metadata.row_groups().len()).collect(), - }; - - // Try to avoid allocate large buffer - let batch_size = self - .batch_size - .min(self.metadata.file_metadata().num_rows() as usize); - let reader_factory = ReaderFactory { - input: self.input.0, - filter: self.filter, - metadata: self.metadata.clone(), - fields: self.fields, - limit: self.limit, - offset: self.offset, - metrics: self.metrics, - max_predicate_cache_size: self.max_predicate_cache_size, - }; + let Self { + input, + metadata, + schema, + fields, + batch_size, + row_groups, + projection, + filter, + selection, + limit, + offset, + metrics, + max_predicate_cache_size, + } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) - let projected_fields = match reader_factory.fields.as_deref().map(|pf| &pf.arrow_type) { - Some(DataType::Struct(fields)) => { - fields.filter_leaves(|idx, _| self.projection.leaf_included(idx)) - } - None => Fields::empty(), - _ => unreachable!("Must be Struct for root type"), - }; - let schema = Arc::new(Schema::new(projected_fields)); + let projected_fields = schema + .fields + .filter_leaves(|idx, _| projection.leaf_included(idx)); + let projected_schema = Arc::new(Schema::new(projected_fields)); - Ok(ParquetRecordBatchStream { - metadata: self.metadata, + let decoder = ParquetPushDecoderBuilder { + input: NoInput, + metadata, + schema, + fields, + projection, + filter, + selection, batch_size, row_groups, - projection: self.projection, - selection: self.selection, - schema, - reader_factory: Some(reader_factory), - state: StreamState::Init, + limit, + offset, + metrics, + max_predicate_cache_size, + } + .build()?; + + let request_state = RequestState::None { input: input.0 }; + + Ok(ParquetRecordBatchStream { + schema: projected_schema, + decoder, + request_state, }) } } -/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group +/// State machine that tracks outstanding requests to fetch data /// -/// Note: If all rows are filtered out in the row group (e.g by filters, limit or -/// offset), returns `None` for the reader. -type ReadResult = Result<(ReaderFactory, Option)>; - -/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create -/// [`ParquetRecordBatchReader`] -struct ReaderFactory { - metadata: Arc, - - /// Top level parquet schema - fields: Option>, - - input: T, - - /// Optional filter - filter: Option, - - /// Limit to apply to remaining row groups. - limit: Option, - - /// Offset to apply to the next - offset: Option, - - /// Metrics - metrics: ArrowReaderMetrics, - - /// Maximum size of the predicate cache - /// - /// See [`RowGroupCache`] for details. - max_predicate_cache_size: usize, +/// The parameter `T` is the input, typically an `AsyncFileReader` +enum RequestState { + /// No outstanding requests + None { + input: T, + }, + /// There is an outstanding request for data + Outstanding { + /// Ranges that have been requested + ranges: Vec>, + /// Future that will resolve (input, requested_ranges) + /// + /// Note the future owns the reader while the request is outstanding + /// and returns it upon completion + future: BoxFuture<'static, Result<(T, Vec)>>, + }, + Done, } -impl ReaderFactory +impl RequestState where - T: AsyncFileReader + Send, + T: AsyncFileReader + Unpin + Send + 'static, { - /// Reads the next row group with the provided `selection`, `projection` and `batch_size` - /// - /// Updates the `limit` and `offset` of the reader factory - /// - /// Note: this captures self so that the resulting future has a static lifetime - async fn read_row_group( - mut self, - row_group_idx: usize, - selection: Option, - projection: ProjectionMask, - batch_size: usize, - ) -> ReadResult { - // TODO: calling build_array multiple times is wasteful - - let meta = self.metadata.row_group(row_group_idx); - let offset_index = self - .metadata - .offset_index() - // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) - .map(|x| x[row_group_idx].as_slice()); - - // Reuse columns that are selected and used by the filters - let cache_projection = match self.compute_cache_projection(&projection) { - Some(projection) => projection, - None => ProjectionMask::none(meta.columns().len()), - }; - let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new( - batch_size, - self.max_predicate_cache_size, - ))); - - let mut row_group = InMemoryRowGroup { - // schema: meta.schema_descr_ptr(), - row_count: meta.num_rows() as usize, - column_chunks: vec![None; meta.columns().len()], - offset_index, - row_group_idx, - metadata: self.metadata.as_ref(), - }; - - let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache); - - let filter = self.filter.as_mut(); - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); - - // Update selection based on any filters - if let Some(filter) = filter { - let cache_options = cache_options_builder.clone().producer(); - - for predicate in filter.predicates.iter_mut() { - if !plan_builder.selects_any() { - return Ok((self, None)); // ruled out entire row group - } - - // (pre) Fetch only the columns that are selected by the predicate - let selection = plan_builder.selection(); - // Fetch predicate columns; expand selection only for cached predicate columns - let cache_mask = Some(&cache_projection); - row_group - .fetch( - &mut self.input, - predicate.projection(), - selection, - batch_size, - cache_mask, - ) - .await?; - - let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) - .with_cache_options(Some(&cache_options)) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; - - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; - } - } - - // Compute the number of rows in the selection before applying limit and offset - let rows_before = plan_builder - .num_rows_selected() - .unwrap_or(row_group.row_count); - - if rows_before == 0 { - return Ok((self, None)); // ruled out entire row group - } - - // Apply any limit and offset - let plan_builder = plan_builder - .limited(row_group.row_count) - .with_offset(self.offset) - .with_limit(self.limit) - .build_limited(); - - let rows_after = plan_builder - .num_rows_selected() - .unwrap_or(row_group.row_count); - - // Update running offset and limit for after the current row group is read - if let Some(offset) = &mut self.offset { - // Reduction is either because of offset or limit, as limit is applied - // after offset has been "exhausted" can just use saturating sub here - *offset = offset.saturating_sub(rows_before - rows_after) - } - - if rows_after == 0 { - return Ok((self, None)); // ruled out entire row group - } - - if let Some(limit) = &mut self.limit { - *limit -= rows_after; - } - // fetch the pages needed for decoding - row_group - // Final projection fetch shouldn't expand selection for cache; pass None - .fetch( - &mut self.input, - &projection, - plan_builder.selection(), - batch_size, - None, - ) - .await?; - - let plan = plan_builder.build(); - - let cache_options = cache_options_builder.consumer(); - let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) - .with_cache_options(Some(&cache_options)) - .build_array_reader(self.fields.as_deref(), &projection)?; - - let reader = ParquetRecordBatchReader::new(array_reader, plan); - - Ok((self, Some(reader))) - } - - /// Compute which columns are used in filters and the final (output) projection - fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option { - // Do not compute the projection mask if the predicate cache is disabled - if self.max_predicate_cache_size == 0 { - return None; - } - - let filters = self.filter.as_ref()?; - let mut cache_projection = filters.predicates.first()?.projection().clone(); - for predicate in filters.predicates.iter() { - cache_projection.union(predicate.projection()); - } - cache_projection.intersect(projection); - self.exclude_nested_columns_from_cache(&cache_projection) - } - - /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) - fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option { - let schema = self.metadata.file_metadata().schema_descr(); - let num_leaves = schema.num_columns(); - - // Count how many leaves each root column has - let num_roots = schema.root_schema().get_fields().len(); - let mut root_leaf_counts = vec![0usize; num_roots]; - for leaf_idx in 0..num_leaves { - let root_idx = schema.get_column_root_idx(leaf_idx); - root_leaf_counts[root_idx] += 1; - } - - // Keep only leaves whose root has exactly one leaf (non-nested) - let mut included_leaves = Vec::new(); - for leaf_idx in 0..num_leaves { - if mask.leaf_included(leaf_idx) { - let root_idx = schema.get_column_root_idx(leaf_idx); - if root_leaf_counts[root_idx] == 1 { - included_leaves.push(leaf_idx); - } - } - } - - if included_leaves.is_empty() { - None - } else { - Some(ProjectionMask::leaves(schema, included_leaves)) + /// Issue a request to fetch `ranges`, returning the Outstanding state + fn begin_request(mut input: T, ranges: Vec>) -> Self { + let ranges_captured = ranges.clone(); + + // Note this must move the input *into* the future + // because the get_byte_ranges future has a lifetime + // (aka can have references internally) and thus must + // own the input while the request is outstanding. + let future = async move { + let data = input.get_byte_ranges(ranges_captured).await?; + Ok((input, data)) } + .boxed(); + RequestState::Outstanding { ranges, future } } } -enum StreamState { - /// At the start of a new row group, or the end of the parquet stream - Init, - /// Decoding a batch - Decoding(ParquetRecordBatchReader), - /// Reading data from input - Reading(BoxFuture<'static, ReadResult>), - /// Error - Error, -} - -impl std::fmt::Debug for StreamState { +impl std::fmt::Debug for RequestState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - StreamState::Init => write!(f, "StreamState::Init"), - StreamState::Decoding(_) => write!(f, "StreamState::Decoding"), - StreamState::Reading(_) => write!(f, "StreamState::Reading"), - StreamState::Error => write!(f, "StreamState::Error"), + RequestState::None { input: _ } => f + .debug_struct("RequestState::None") + .field("input", &"...") + .finish(), + RequestState::Outstanding { ranges, .. } => f + .debug_struct("RequestState::Outstanding") + .field("ranges", &ranges) + .finish(), + RequestState::Done => { + write!(f, "RequestState::Done") + } } } } @@ -796,35 +601,23 @@ impl std::fmt::Debug for StreamState { /// required, which is especially important for object stores, where IO operations /// have latencies in the hundreds of milliseconds /// +/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over +/// buffering. /// /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html pub struct ParquetRecordBatchStream { - metadata: Arc, - + /// Output schema of the stream schema: SchemaRef, - - row_groups: VecDeque, - - projection: ProjectionMask, - - batch_size: usize, - - selection: Option, - - /// This is an option so it can be moved into a future - reader_factory: Option>, - - state: StreamState, + /// Input and Outstanding IO request, if any + request_state: RequestState, + /// Decoding state machine (no IO) + decoder: ParquetPushDecoder, } impl std::fmt::Debug for ParquetRecordBatchStream { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ParquetRecordBatchStream") - .field("metadata", &self.metadata) - .field("schema", &self.schema) - .field("batch_size", &self.batch_size) - .field("projection", &self.projection) - .field("state", &self.state) + .field("request_state", &self.request_state) .finish() } } @@ -858,45 +651,35 @@ where /// - `Ok(Some(reader))` which holds all the data for the row group. pub async fn next_row_group(&mut self) -> Result> { loop { - match &mut self.state { - StreamState::Decoding(_) | StreamState::Reading(_) => { - return Err(ParquetError::General( - "Cannot combine the use of next_row_group with the Stream API".to_string(), - )); - } - StreamState::Init => { - let row_group_idx = match self.row_groups.pop_front() { - Some(idx) => idx, - None => return Ok(None), - }; - - let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize; - - let selection = self.selection.as_mut().map(|s| s.split_off(row_count)); - - let reader_factory = self.reader_factory.take().expect("lost reader factory"); - - let (reader_factory, maybe_reader) = reader_factory - .read_row_group( - row_group_idx, - selection, - self.projection.clone(), - self.batch_size, - ) - .await - .inspect_err(|_| { - self.state = StreamState::Error; - })?; - self.reader_factory = Some(reader_factory); - - if let Some(reader) = maybe_reader { - return Ok(Some(reader)); - } else { - // All rows skipped, read next row group - continue; + // Take ownership of request state to process, leaving self in a + // valid state + let request_state = std::mem::replace(&mut self.request_state, RequestState::Done); + match request_state { + // No outstanding requests, proceed to setup next row group + RequestState::None { input } => { + match self.decoder.try_next_reader()? { + DecodeResult::NeedsData(ranges) => { + self.request_state = RequestState::begin_request(input, ranges); + continue; // poll again (as the input might be ready immediately) + } + DecodeResult::Data(reader) => { + self.request_state = RequestState::None { input }; + return Ok(Some(reader)); + } + DecodeResult::Finished => return Ok(None), } } - StreamState::Error => return Ok(None), // Ends the stream as error happens. + RequestState::Outstanding { ranges, future } => { + let (input, data) = future.await?; + // Push the requested data to the decoder and try again + self.decoder.push_ranges(ranges, data)?; + self.request_state = RequestState::None { input }; + continue; // try and decode on next iteration + } + RequestState::Done => { + self.request_state = RequestState::Done; + return Ok(None); + } } } } @@ -907,102 +690,82 @@ where T: AsyncFileReader + Unpin + Send + 'static, { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.poll_next_inner(cx) { + Ok(res) => { + // Successfully decoded a batch, or reached end of stream. + // convert Option to Option> + res.map(|res| Ok(res).transpose()) + } + Err(e) => { + self.request_state = RequestState::Done; + Poll::Ready(Some(Err(e))) + } + } + } +} + +impl ParquetRecordBatchStream +where + T: AsyncFileReader + Unpin + Send + 'static, +{ + /// Inner state machine + /// + /// Note this is separate from poll_next so we can use ? operator to check for errors + /// as it returns `Result>>` + fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result>> { loop { - match &mut self.state { - StreamState::Decoding(batch_reader) => match batch_reader.next() { - Some(Ok(batch)) => { - return Poll::Ready(Some(Ok(batch))); - } - Some(Err(e)) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); + let request_state = std::mem::replace(&mut self.request_state, RequestState::Done); + match request_state { + RequestState::None { input } => { + // No outstanding requests, proceed to decode the next batch + match self.decoder.try_decode()? { + DecodeResult::NeedsData(ranges) => { + self.request_state = RequestState::begin_request(input, ranges); + continue; // poll again (as the input might be ready immediately) + } + DecodeResult::Data(batch) => { + self.request_state = RequestState::None { input }; + return Ok(Poll::Ready(Some(batch))); + } + DecodeResult::Finished => { + self.request_state = RequestState::Done; + return Ok(Poll::Ready(None)); + } } - None => self.state = StreamState::Init, - }, - StreamState::Init => { - let row_group_idx = match self.row_groups.pop_front() { - Some(idx) => idx, - None => return Poll::Ready(None), - }; - - let reader = self.reader_factory.take().expect("lost reader factory"); - - let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize; - - let selection = self.selection.as_mut().map(|s| s.split_off(row_count)); - - let fut = reader - .read_row_group( - row_group_idx, - selection, - self.projection.clone(), - self.batch_size, - ) - .boxed(); - - self.state = StreamState::Reading(fut) } - StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) { - Ok((reader_factory, maybe_reader)) => { - self.reader_factory = Some(reader_factory); - match maybe_reader { - // Read records from [`ParquetRecordBatchReader`] - Some(reader) => self.state = StreamState::Decoding(reader), - // All rows skipped, read next row group - None => self.state = StreamState::Init, - } + RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) { + // Data was ready, push it to the decoder and continue + Poll::Ready(result) => { + let (input, data) = result?; + // Push the requested data to the decoder + self.decoder.push_ranges(ranges, data)?; + self.request_state = RequestState::None { input }; + continue; // next iteration will try to decode the next batch } - Err(e) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(e))); + Poll::Pending => { + self.request_state = RequestState::Outstanding { ranges, future }; + return Ok(Poll::Pending); } }, - StreamState::Error => return Poll::Ready(None), // Ends the stream as error happens. + RequestState::Done => { + // Stream is done (error or end), return None + self.request_state = RequestState::Done; + return Ok(Poll::Ready(None)); + } } } } } -// Note this implementation is not with the rest of the InMemoryRowGroup -// implementation because it relies on several async traits and types -// that are only available when the "async" feature is enabled. -impl InMemoryRowGroup<'_> { - /// Fetches any additional column data specified in `projection` that is not already - /// present in `self.column_chunks`. - /// - /// If `selection` is provided, only the pages required for the selection - /// are fetched. Otherwise, all pages are fetched. - pub(crate) async fn fetch( - &mut self, - input: &mut T, - projection: &ProjectionMask, - selection: Option<&RowSelection>, - batch_size: usize, - cache_mask: Option<&ProjectionMask>, - ) -> Result<()> { - // Figure out what ranges to fetch - let FetchRanges { - ranges, - page_start_offsets, - } = self.fetch_ranges(projection, selection, batch_size, cache_mask); - // do the actual fetch - let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); - // update our in memory buffers (self.column_chunks) with the fetched data - self.fill_column_chunks(projection, page_start_offsets, chunk_data); - Ok(()) - } -} #[cfg(test)] mod tests { use super::*; - use crate::arrow::ArrowWriter; use crate::arrow::arrow_reader::{ - ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, + ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, }; use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; - use crate::arrow::schema::parquet_to_arrow_schema_and_fields; + use crate::arrow::{ArrowWriter, ProjectionMask}; use crate::file::metadata::ParquetMetaDataReader; use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; @@ -1724,98 +1487,6 @@ mod tests { assert_eq!(total_rows, 730); } - #[tokio::test] - #[allow(deprecated)] - async fn test_in_memory_row_group_sparse() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{testdata}/alltypes_tiny_pages.parquet"); - let data = Bytes::from(std::fs::read(path).unwrap()); - - let metadata = ParquetMetaDataReader::new() - .with_page_indexes(true) - .parse_and_finish(&data) - .unwrap(); - - let offset_index = metadata.offset_index().expect("reading offset index")[0].clone(); - - let mut metadata_builder = metadata.into_builder(); - let mut row_groups = metadata_builder.take_row_groups(); - row_groups.truncate(1); - let row_group_meta = row_groups.pop().unwrap(); - - let metadata = metadata_builder - .add_row_group(row_group_meta) - .set_column_index(None) - .set_offset_index(Some(vec![offset_index.clone()])) - .build(); - - let metadata = Arc::new(metadata); - - let num_rows = metadata.row_group(0).num_rows(); - - assert_eq!(metadata.num_row_groups(), 1); - - let async_reader = TestReader::new(data.clone()); - - let requests = async_reader.requests.clone(); - let (_, fields) = parquet_to_arrow_schema_and_fields( - metadata.file_metadata().schema_descr(), - ProjectionMask::all(), - None, - ) - .unwrap(); - - let _schema_desc = metadata.file_metadata().schema_descr(); - - let projection = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]); - - let reader_factory = ReaderFactory { - metadata, - fields: fields.map(Arc::new), - input: async_reader, - filter: None, - limit: None, - offset: None, - metrics: ArrowReaderMetrics::disabled(), - max_predicate_cache_size: 0, - }; - - let mut skip = true; - let mut pages = offset_index[0].page_locations.iter().peekable(); - - // Setup `RowSelection` so that we can skip every other page, selecting the last page - let mut selectors = vec![]; - let mut expected_page_requests: Vec> = vec![]; - while let Some(page) = pages.next() { - let num_rows = if let Some(next_page) = pages.peek() { - next_page.first_row_index - page.first_row_index - } else { - num_rows - page.first_row_index - }; - - if skip { - selectors.push(RowSelector::skip(num_rows as usize)); - } else { - selectors.push(RowSelector::select(num_rows as usize)); - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; - expected_page_requests.push(start..end); - } - skip = !skip; - } - - let selection = RowSelection::from(selectors); - - let (_factory, _reader) = reader_factory - .read_row_group(0, Some(selection), projection.clone(), 48) - .await - .expect("reading row group"); - - let requests = requests.lock().unwrap(); - - assert_eq!(&requests[..], &expected_page_requests) - } - #[tokio::test] async fn test_batch_size_overallocate() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -1831,13 +1502,16 @@ mod tests { let file_rows = builder.metadata().file_metadata().num_rows() as usize; - let stream = builder + let builder = builder .with_projection(ProjectionMask::all()) - .with_batch_size(1024) - .build() - .unwrap(); + .with_batch_size(1024); + + // even though the batch size is set to 1024, it should adjust to the max + // number of rows in the file (8) assert_ne!(1024, file_rows); - assert_eq!(stream.batch_size, file_rows); + assert_eq!(builder.batch_size, file_rows); + + let _stream = builder.build().unwrap(); } #[tokio::test] diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 4b932fb08034..b26a21132c4d 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -81,7 +81,7 @@ use std::sync::Arc; /// # let parquet_metadata = Arc::new(parquet_metadata); /// // The file length and metadata are required to create the decoder /// let mut decoder = -/// ParquetPushDecoderBuilder::try_new_decoder(file_length, parquet_metadata) +/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata) /// .unwrap() /// // Optionally configure the decoder, e.g. batch size /// .with_batch_size(1024) @@ -110,7 +110,19 @@ use std::sync::Arc; /// } /// } /// ``` -pub type ParquetPushDecoderBuilder = ArrowReaderBuilder; +pub type ParquetPushDecoderBuilder = ArrowReaderBuilder; + +/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`] +/// +/// There is no "input" for the push decoder by design (the idea is that +/// the caller pushes data to the decoder as needed).. +/// +/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers, +/// which DO have an `input`. To support reusing the same builder code for +/// all three types of decoders, we define this `NoInput` for the push decoder to +/// denote in the type system there is no type. +#[derive(Debug, Clone, Copy)] +pub struct NoInput; /// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for /// more options that can be configured. @@ -122,15 +134,8 @@ impl ParquetPushDecoderBuilder { /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder /// /// See example on [`ParquetPushDecoderBuilder`] - pub fn try_new_decoder( - file_len: u64, - parquet_metadata: Arc, - ) -> Result { - Self::try_new_decoder_with_options( - file_len, - parquet_metadata, - ArrowReaderOptions::default(), - ) + pub fn try_new_decoder(parquet_metadata: Arc) -> Result { + Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default()) } /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file @@ -139,27 +144,26 @@ impl ParquetPushDecoderBuilder { /// This is similar to [`Self::try_new_decoder`] but allows configuring /// options such as Arrow schema pub fn try_new_decoder_with_options( - file_len: u64, parquet_metadata: Arc, arrow_reader_options: ArrowReaderOptions, ) -> Result { let arrow_reader_metadata = ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?; - Ok(Self::new_with_metadata(file_len, arrow_reader_metadata)) + Ok(Self::new_with_metadata(arrow_reader_metadata)) } /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`]. /// /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from /// the Parquet metadata and reader options. - pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: ArrowReaderMetadata) -> Self { - Self::new_builder(file_len, arrow_reader_metadata) + pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self { + Self::new_builder(NoInput, arrow_reader_metadata) } /// Create a [`ParquetPushDecoder`] with the configured options pub fn build(self) -> Result { let Self { - input: file_len, + input: NoInput, metadata: parquet_metadata, schema: _, fields, @@ -179,6 +183,7 @@ impl ParquetPushDecoderBuilder { row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect()); // Prepare to build RowGroup readers + let file_len = 0; // not used in push decoder let buffers = PushBuffers::new(file_len); let row_group_reader_builder = RowGroupReaderBuilder::new( batch_size, @@ -268,7 +273,54 @@ impl ParquetPushDecoder { ///``` pub fn try_decode(&mut self) -> Result, ParquetError> { let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished); - let (new_state, decode_result) = current_state.try_transition()?; + let (new_state, decode_result) = current_state.try_next_batch()?; + self.state = new_state; + Ok(decode_result) + } + + /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or + /// return what data is needed to produce it. + /// + /// This API can be used to get a reader for decoding the next set of + /// RecordBatches while proceeding to begin fetching data for the set (e.g + /// row group) + /// + /// Example + /// ```no_run + /// # use parquet::arrow::push_decoder::ParquetPushDecoder; + /// use parquet::DecodeResult; + /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() } + /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec>) { unimplemented!() } + /// let mut decoder = get_decoder(); + /// loop { + /// match decoder.try_next_reader().unwrap() { + /// DecodeResult::NeedsData(ranges) => { + /// // The decoder needs more data. Fetch the data for the given ranges + /// // call decoder.push_ranges(ranges, data) and call again + /// push_data(&mut decoder, ranges); + /// } + /// DecodeResult::Data(reader) => { + /// // spawn a thread to read the batches in parallel + /// // with fetching the next row group / data + /// std::thread::spawn(move || { + /// for batch in reader { + /// let batch = batch.unwrap(); + /// println!("Got batch with {} rows", batch.num_rows()); + /// } + /// }); + /// } + /// DecodeResult::Finished => { + /// // The decoder has finished decoding all data + /// break; + /// } + /// } + /// } + ///``` + pub fn try_next_reader( + &mut self, + ) -> Result, ParquetError> { + let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished); + let (new_state, decode_result) = current_state.try_next_reader()?; self.state = new_state; Ok(decode_result) } @@ -332,16 +384,106 @@ enum ParquetDecoderState { } impl ParquetDecoderState { + /// If actively reading a RowGroup, return the currently active + /// ParquetRecordBatchReader and advance to the next group. + fn try_next_reader( + self, + ) -> Result<(Self, DecodeResult), ParquetError> { + let mut current_state = self; + loop { + let (next_state, decode_result) = current_state.transition()?; + // if more data is needed to transition, can't proceed further without it + match decode_result { + DecodeResult::NeedsData(ranges) => { + return Ok((next_state, DecodeResult::NeedsData(ranges))); + } + // act next based on state + DecodeResult::Data(()) | DecodeResult::Finished => {} + } + match next_state { + // not ready to read yet, continue transitioning + Self::ReadingRowGroup { .. } => current_state = next_state, + // have a reader ready, so return it and set ourself to ReadingRowGroup + Self::DecodingRowGroup { + record_batch_reader, + remaining_row_groups, + } => { + let result = DecodeResult::Data(*record_batch_reader); + let next_state = Self::ReadingRowGroup { + remaining_row_groups, + }; + return Ok((next_state, result)); + } + Self::Finished => { + return Ok((Self::Finished, DecodeResult::Finished)); + } + } + } + } + /// Current state --> next state + output /// - /// This function is called to check if the decoder has any RecordBatches - /// and [`Self::push_data`] is called when new data is available. - /// - /// # Notes + /// This function is called to get the next RecordBatch /// /// This structure is used to reduce the indentation level of the main loop /// in try_build - fn try_transition(self) -> Result<(Self, DecodeResult), ParquetError> { + fn try_next_batch(self) -> Result<(Self, DecodeResult), ParquetError> { + let mut current_state = self; + loop { + let (new_state, decode_result) = current_state.transition()?; + // if more data is needed to transition, can't proceed further without it + match decode_result { + DecodeResult::NeedsData(ranges) => { + return Ok((new_state, DecodeResult::NeedsData(ranges))); + } + // act next based on state + DecodeResult::Data(()) | DecodeResult::Finished => {} + } + match new_state { + // not ready to read yet, continue transitioning + Self::ReadingRowGroup { .. } => current_state = new_state, + // have a reader ready, so decode the next batch + Self::DecodingRowGroup { + mut record_batch_reader, + remaining_row_groups, + } => { + match record_batch_reader.next() { + // Successfully decoded a batch, return it + Some(Ok(batch)) => { + let result = DecodeResult::Data(batch); + let next_state = Self::DecodingRowGroup { + record_batch_reader, + remaining_row_groups, + }; + return Ok((next_state, result)); + } + // No more batches in this row group, move to the next row group + None => { + current_state = Self::ReadingRowGroup { + remaining_row_groups, + } + } + // some error occurred while decoding, so return that + Some(Err(e)) => { + // TODO: preserve ArrowError in ParquetError (rather than convert to a string) + return Err(ParquetError::ArrowError(e.to_string())); + } + } + } + Self::Finished => { + return Ok((Self::Finished, DecodeResult::Finished)); + } + } + } + } + + /// Transition to the next state with a reader (data can be produced), if not end of stream + /// + /// This function is called in a loop until the decoder is ready to return + /// data (has the required pages buffered) or is finished. + fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> { + // result returned when there is data ready + let data_ready = DecodeResult::Data(()); match self { Self::ReadingRowGroup { mut remaining_row_groups, @@ -350,13 +492,14 @@ impl ParquetDecoderState { // If we have a next reader, we can transition to decoding it DecodeResult::Data(record_batch_reader) => { // Transition to decoding the row group - Self::DecodingRowGroup { - record_batch_reader: Box::new(record_batch_reader), - remaining_row_groups, - } - .try_transition() + Ok(( + Self::DecodingRowGroup { + record_batch_reader: Box::new(record_batch_reader), + remaining_row_groups, + }, + data_ready, + )) } - // If there are no more readers, we are finished DecodeResult::NeedsData(ranges) => { // If we need more data, we return the ranges needed and stay in Reading // RowGroup state @@ -367,40 +510,17 @@ impl ParquetDecoderState { DecodeResult::NeedsData(ranges), )) } + // If there are no more readers, we are finished DecodeResult::Finished => { // No more row groups to read, we are finished Ok((Self::Finished, DecodeResult::Finished)) } } } - Self::DecodingRowGroup { - mut record_batch_reader, - remaining_row_groups, - } => { - // Decide the next record batch - match record_batch_reader.next() { - Some(Ok(batch)) => { - // Successfully decoded a batch, return it - Ok(( - Self::DecodingRowGroup { - record_batch_reader, - remaining_row_groups, - }, - DecodeResult::Data(batch), - )) - } - None => { - // No more batches in this row group, move to the next row group - // or finish if there are no more row groups - Self::ReadingRowGroup { - remaining_row_groups, - } - .try_transition() - } - Some(Err(e)) => Err(ParquetError::from(e)), // some error occurred while decoding - } - } - Self::Finished => Ok((Self::Finished, DecodeResult::Finished)), + // if we are already in DecodingRowGroup, just return data ready + Self::DecodingRowGroup { .. } => Ok((self, data_ready)), + // if finished, just return finished + Self::Finished => Ok((self, DecodeResult::Finished)), } } @@ -485,13 +605,10 @@ mod test { /// available in memory #[test] fn test_decoder_all_data() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap() - .build() - .unwrap(); + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); decoder .push_range(test_file_range(), TEST_FILE_DATA.clone()) @@ -514,13 +631,10 @@ mod test { /// fetched as needed #[test] fn test_decoder_incremental() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap() - .build() - .unwrap(); + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); let mut results = vec![]; @@ -553,13 +667,10 @@ mod test { /// Decode the entire file incrementally, simulating partial reads #[test] fn test_decoder_partial() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap() - .build() - .unwrap(); + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); // First row group, expect a single request for all data needed to read "a" and "b" let ranges = expect_needs_data(decoder.try_decode()); @@ -597,11 +708,8 @@ mod test { /// only a single request per row group #[test] fn test_decoder_selection_does_one_request() { - let builder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap(); + let builder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap(); let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); @@ -635,11 +743,8 @@ mod test { /// of the data needed for the filter at a time simulating partial reads. #[test] fn test_decoder_single_filter_partial() { - let builder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap(); + let builder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap(); // Values in column "a" range 0..399 // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1) @@ -696,11 +801,8 @@ mod test { /// Decode with a filter where we also skip one of the RowGroups via a RowSelection #[test] fn test_decoder_single_filter_and_row_selection() { - let builder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap(); + let builder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap(); // Values in column "a" range 0..399 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1) @@ -751,11 +853,8 @@ mod test { #[test] fn test_decoder_multi_filters() { // Create a decoder for decoding parquet data (note it does not have any IO / readers) - let builder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap(); + let builder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap(); // Values in column "a" range 0..399 // Values in column "b" range 400..799 @@ -836,11 +935,8 @@ mod test { #[test] fn test_decoder_reuses_filter_pages() { // Create a decoder for decoding parquet data (note it does not have any IO / readers) - let builder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap(); + let builder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap(); // Values in column "a" range 0..399 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1) @@ -887,11 +983,8 @@ mod test { #[test] fn test_decoder_empty_filters() { - let builder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap(); + let builder = + ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap(); let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); // only read column "c", but with empty filters @@ -929,17 +1022,14 @@ mod test { #[test] fn test_decoder_offset_limit() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap() - // skip entire first row group (200 rows) and first 25 rows of second row group - .with_offset(225) - // and limit to 20 rows - .with_limit(20) - .build() - .unwrap(); + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + // skip entire first row group (200 rows) and first 25 rows of second row group + .with_offset(225) + // and limit to 20 rows + .with_limit(20) + .build() + .unwrap(); // First row group should be skipped, @@ -958,14 +1048,11 @@ mod test { #[test] fn test_decoder_row_group_selection() { // take only the second row group - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap() - .with_row_groups(vec![1]) - .build() - .unwrap(); + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![1]) + .build() + .unwrap(); // First row group should be skipped, @@ -984,17 +1071,14 @@ mod test { #[test] fn test_decoder_row_selection() { // take only the second row group - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( - test_file_len(), - test_file_parquet_metadata(), - ) - .unwrap() - .with_row_selection(RowSelection::from(vec![ - RowSelector::skip(225), // skip first row group and 25 rows of second]) - RowSelector::select(20), // take 20 rows - ])) - .build() - .unwrap(); + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(225), // skip first row group and 25 rows of second]) + RowSelector::select(20), // take 20 rows + ])) + .build() + .unwrap(); // First row group should be skipped, diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index be9070ae8b49..a0ced8aa8522 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -608,6 +608,10 @@ impl RowGroupReaderBuilder { } fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option { + // Do not compute the projection mask if the predicate cache is disabled + if self.max_predicate_cache_size == 0 { + return None; + } let mut cache_projection = filter.predicates.first()?.projection().clone(); for predicate in filter.predicates.iter() { cache_projection.union(predicate.projection()); diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index 0475d48768f0..0c00cf9bd57f 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -200,13 +200,6 @@ impl ChunkReader for PushBuffers { } fn get_bytes(&self, start: u64, length: usize) -> Result { - if start > self.file_len { - return Err(ParquetError::General(format!( - "Requested start {start} is beyond the end of the file (file length: {})", - self.file_len - ))); - } - // find the range that contains the start offset for (range, data) in self.iter() { if range.start <= start && range.end >= start + length as u64 { diff --git a/parquet/tests/encryption/encryption_agnostic.rs b/parquet/tests/encryption/encryption_agnostic.rs index 06c3b743aedd..604155c81a83 100644 --- a/parquet/tests/encryption/encryption_agnostic.rs +++ b/parquet/tests/encryption/encryption_agnostic.rs @@ -139,8 +139,8 @@ pub async fn read_plaintext_footer_file_without_decryption_properties_async() { Some(Err(ParquetError::ArrowError(s))) => { assert!(s.contains("Parquet error")); } - _ => { - panic!("Expected ArrowError::ParquetError"); + err => { + panic!("Expected ArrowError::ParquetError, got {err:?}"); } }; }