diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc b/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc index 0484e0b259c..a56f2ed1298 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc +++ b/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc @@ -209,6 +209,13 @@ Status UnpackDictionary(KernelContext* ctx, const ExecSpan& batch, ExecResult* o return Status::OK(); } +Status DecodeRunEndEncoded(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + Datum input(batch[0].array.ToArrayData()); + ARROW_ASSIGN_OR_RAISE(Datum decoded, RunEndDecode(input, ctx->exec_context())); + out->value = std::move(decoded.array()); + return Status::OK(); +} + Status OutputAllNull(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { // TODO(wesm): there is no good reason to have to use ArrayData here, so we // should clean this up later. This is used in the dict->null cast @@ -286,6 +293,12 @@ static bool CanCastFromDictionary(Type::type type_id) { is_fixed_size_binary(type_id)); } +static bool CanCastFromRunEndEncoded(Type::type type_id) { + return (is_primitive(type_id) || + is_base_binary_like(type_id) || + is_fixed_size_binary(type_id)); +} + void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* func) { // From null to this type ScalarKernel kernel; @@ -303,6 +316,13 @@ void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* fun MemAllocation::NO_PREALLOCATE)); } + // From run-end-encoded to this type + if (CanCastFromRunEndEncoded(out_type_id)) { + DCHECK_OK(func->AddKernel(Type::RUN_END_ENCODED, {InputType(Type::RUN_END_ENCODED)}, out_ty, + DecodeRunEndEncoded, NullHandling::COMPUTED_NO_PREALLOCATE, + MemAllocation::NO_PREALLOCATE)); + } + // From extension type to this type DCHECK_OK(func->AddKernel(Type::EXTENSION, {InputType(Type::EXTENSION)}, out_ty, CastFromExtension, NullHandling::COMPUTED_NO_PREALLOCATE, diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 7ef60618700..f62d52cc7d9 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -115,6 +115,10 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties( auto column_index = metadata.schema()->ColumnIndex(name); properties.set_read_dictionary(column_index, true); } + for (const std::string& name : format.reader_options.ree_columns) { + auto column_index = metadata.schema()->ColumnIndex(name); + properties.set_read_ree(column_index, true); + } properties.set_coerce_int96_timestamp_unit( format.reader_options.coerce_int96_timestamp_unit); properties.set_binary_type(format.reader_options.binary_type); @@ -445,6 +449,7 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const { // FIXME implement comparison for decryption options return (reader_options.dict_columns == other_reader_options.dict_columns && + reader_options.ree_columns == other_reader_options.ree_columns && reader_options.coerce_int96_timestamp_unit == other_reader_options.coerce_int96_timestamp_unit && reader_options.binary_type == other_reader_options.binary_type && diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 1811a96bf98..05126e334f8 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -89,6 +89,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// /// @{ std::unordered_set dict_columns; + std::unordered_set ree_columns; arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO; Type::type binary_type = Type::BINARY; Type::type list_type = Type::LIST; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 696bda19359..4d0771a775a 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -271,6 +271,17 @@ TEST_F(TestParquetFileFormat, InspectDictEncoded) { AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false); } +TEST_F(TestParquetFileFormat, InspectReeEncoded) { + auto reader = GetRecordBatchReader(schema({field("utf8", utf8())})); + auto source = GetFileSource(reader.get()); + + format_->reader_options.ree_columns = {"utf8"}; + ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); + + Schema expected_schema({field("utf8", run_end_encoded(int32(), utf8()))}); + AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false); +} + TEST_F(TestParquetFileFormat, IsSupported) { TestIsSupported(); } TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { TestWrite(); } @@ -617,6 +628,25 @@ TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderDictEncoded) { } ASSERT_EQ(row_count, expected_rows()); } +TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderReeEncoded) { + auto reader = GetRecordBatchReader(schema({field("utf8", utf8())})); + auto source = GetFileSource(reader.get()); + + SetSchema(reader->schema()->fields()); + SetFilter(literal(true)); + format_->reader_options.ree_columns = {"utf8"}; + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + int64_t row_count = 0; + Schema expected_schema({field("utf8", run_end_encoded(int32(), utf8()))}); + + for (auto maybe_batch : PhysicalBatches(fragment)) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + row_count += batch->num_rows(); + AssertSchemaEqual(*batch->schema(), expected_schema, /* check_metadata = */ false); + } + ASSERT_EQ(row_count, expected_rows()); +} TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderPreBuffer) { auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); diff --git a/cpp/src/arrow/util/rle_encoding_internal.h b/cpp/src/arrow/util/rle_encoding_internal.h index 2420270f3ab..b330c14f0bc 100644 --- a/cpp/src/arrow/util/rle_encoding_internal.h +++ b/cpp/src/arrow/util/rle_encoding_internal.h @@ -426,6 +426,14 @@ class RleBitPackedDecoder { /// values. [[nodiscard]] bool Get(value_type* val); + // Get the next logical value and num_repeats within the specified batch_size. + [[nodiscard]] bool GetNextValueAndNumRepeats(value_type* val, int* num_repeats, int batch_size); + + /// Like GetNextValueAndNumRepeats but add spacing for null entries. + [[nodiscard]] bool GetNextValueAndNumRepeatsSpaced(value_type* val, bool* is_null, + int* num_repeats, int batch_size, + const uint8_t* valid_bits, int64_t valid_bits_offset); + /// Get a batch of values return the number of decoded elements. /// May write fewer elements to the output than requested if there are not enough values /// left or if an error occurred. @@ -722,7 +730,6 @@ void RleBitPackedDecoder::ParseWithCallable(Callable&& func) { auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); } auto OnRleRun(RleRun run) { return func(std::move(run)); } } handler{std::move(func)}; - parser_.Parse(std::move(handler)); } @@ -731,6 +738,69 @@ bool RleBitPackedDecoder::Get(value_type* val) { return GetBatch(val, 1) == 1; } +template +bool RleBitPackedDecoder::GetNextValueAndNumRepeats(value_type* val, int* num_repeats, int batch_size) { + using ControlFlow = RleBitPackedParser::ControlFlow; + + if (ARROW_PREDICT_FALSE(run_remaining() > 0)) { + if (std::holds_alternative>(decoder_)) { + auto& decoder = std::get>(decoder_); + *num_repeats = 1; + return decoder.Get(val, value_bit_width_); + } else { + auto& decoder = std::get>(decoder_); + *num_repeats = std::min(decoder.remaining(), batch_size); + ARROW_DCHECK_EQ(decoder.Advance(*num_repeats, value_bit_width_), *num_repeats); + return decoder.Get(val, value_bit_width_); + } + } + + bool read_new_value = false; + + ParseWithCallable([&](auto run) { + if constexpr(std::is_same_v) { + BitPackedRunDecoder decoder(run, value_bit_width_); + read_new_value = decoder.Get(val, value_bit_width_); + *num_repeats = 1; + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + else { + RleRunDecoder decoder(run, value_bit_width_); + *num_repeats = std::min(decoder.remaining(), batch_size); + read_new_value = decoder.Get(val, value_bit_width_); + ARROW_DCHECK_EQ(decoder.Advance(*num_repeats, value_bit_width_), *num_repeats); + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + }); + + return read_new_value; +} + +template +bool RleBitPackedDecoder::GetNextValueAndNumRepeatsSpaced(value_type* val, bool* is_null, + int* num_repeats, int batch_size, + const uint8_t* valid_bits, int64_t valid_bits_offset) { + arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset, + /*length=*/batch_size); + arrow::internal::BitRun valid_run = bit_reader.NextRun(); + while (ARROW_PREDICT_FALSE(valid_run.length == 0)) { + valid_run = bit_reader.NextRun(); + } + ARROW_DCHECK_GT(batch_size, 0); + ARROW_DCHECK_GT(valid_run.length, 0); + if (valid_run.set) { + return GetNextValueAndNumRepeats( + val, num_repeats, + static_cast(std::min(valid_run.length, static_cast(batch_size)))); + } else { + *is_null = true; + *num_repeats = static_cast(valid_run.length); + } + return true; +} + template auto RleBitPackedDecoder::GetBatch(value_type* out, rle_size_t batch_size) -> rle_size_t { diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index dc7d40d2a38..a88aa7f1452 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -406,7 +406,10 @@ add_parquet_test(arrow-reader-writer-test SOURCES arrow/arrow_reader_writer_test.cc arrow/arrow_statistics_test.cc - arrow/variant_test.cc) + arrow/variant_test.cc + EXTRA_LINK_LIBS + arrow_compute_core_testing + arrow_acero_testing) add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc arrow/reconstruct_internal_test.cc) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index e081b428e24..d691393c17f 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -37,6 +37,7 @@ #include "arrow/array/builder_primitive.h" #include "arrow/chunked_array.h" #include "arrow/compute/api.h" +#include "arrow/compute/initialize.h" #include "arrow/extension/json.h" #include "arrow/io/api.h" #include "arrow/record_batch.h" @@ -61,6 +62,8 @@ # include "arrow/csv/api.h" #endif +#include "arrow/acero/test_util_internal.h" + #include "parquet/api/reader.h" #include "parquet/api/writer.h" @@ -4884,6 +4887,7 @@ class TestArrowReadDictionary : public ::testing::TestWithParam { std::shared_ptr actual; ASSERT_OK_NO_THROW(reader->ReadTable(&actual)); + ::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false); } @@ -5029,6 +5033,92 @@ INSTANTIATE_TEST_SUITE_P( ReadDictionary, TestArrowReadDictionary, ::testing::ValuesIn(TestArrowReadDictionary::null_probabilities())); +class TestArrowReadRunEndEncoded : public ::testing::TestWithParam { + public: + static constexpr int kNumRowGroups = 16; + + struct { + int num_rows = 1024 * kNumRowGroups; + int num_row_groups = kNumRowGroups; + int num_uniques = 128; + } options; + + void SetUp() override { + ASSERT_OK(::arrow::compute::Initialize()); + properties_ = default_arrow_reader_properties(); + + GenerateData(GetParam()); + } + + void GenerateData(double null_probability) { + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 100; + ::arrow::random::RandomArrayGenerator rag(0); + dense_values_ = rag.StringWithRepeats(options.num_rows, options.num_uniques, + min_length, max_length, null_probability); + expected_dense_ = MakeSimpleTable(dense_values_, /*nullable=*/true); + } + + void TearDown() override {} + + void WriteSimple() { + // Write `num_row_groups` row groups; each row group will have a different dictionary + ASSERT_NO_FATAL_FAILURE( + WriteTableToBuffer(expected_dense_, options.num_rows / options.num_row_groups, + default_arrow_writer_properties(), &buffer_)); + } + + void CheckReadWholeFile(const Table& expected) { + ASSERT_OK_AND_ASSIGN(auto reader, GetReader()); + + std::shared_ptr
actual; + ASSERT_OK_NO_THROW(reader->ReadTable(&actual)); + ::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false); + } + + static std::vector null_probabilities() { return {0.0, 0.5, 1}; } + + protected: + std::shared_ptr dense_values_; + std::shared_ptr
expected_dense_; + std::shared_ptr
expected_dict_; + std::shared_ptr buffer_; + ArrowReaderProperties properties_; + + ::arrow::Result> GetReader() { + std::unique_ptr reader; + + FileReaderBuilder builder; + RETURN_NOT_OK(builder.Open(std::make_shared(buffer_))); + RETURN_NOT_OK(builder.properties(properties_)->Build(&reader)); + + return reader; + } +}; + +TEST_P(TestArrowReadRunEndEncoded, ReadWholeFile) { + properties_.set_read_ree(0, true); + + WriteSimple(); + + auto num_row_groups = options.num_row_groups; + auto chunk_size = options.num_rows / num_row_groups; + + std::vector> chunks(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + chunks[i] = dense_values_->Slice(chunk_size * i, chunk_size); + } + auto ex_table = MakeSimpleTable(std::make_shared(chunks), + /*nullable=*/true); + ASSERT_OK_AND_ASSIGN(ex_table, ::arrow::acero::RunEndEncodeTableColumns(*ex_table, {0})); + + CheckReadWholeFile(*ex_table); +} + +INSTANTIATE_TEST_SUITE_P( + ReadRunEndEncoded, TestArrowReadRunEndEncoded, + ::testing::ValuesIn(TestArrowReadRunEndEncoded::null_probabilities())); + TEST(TestArrowWriteDictionaries, ChangingDictionaries) { constexpr int num_unique = 50; constexpr int repeat = 10000; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index d42fdc5034a..0c7cdcda7ec 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -462,7 +462,9 @@ class LeafReader : public ColumnReaderImpl { record_reader_ = RecordReader::Make( descr_, leaf_info, ctx_->pool, /*read_dictionary=*/field_->type()->id() == ::arrow::Type::DICTIONARY, - /*read_dense_for_nullable=*/false, /*arrow_type=*/type_for_reading); + /*read_dense_for_nullable=*/false, + /*read_run_end_encoded=*/field_->type()->id() == ::arrow::Type::RUN_END_ENCODED, + /*arrow_type=*/type_for_reading); NextRowGroup(); } diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index b622e93e072..622e14ea7f8 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -95,6 +95,7 @@ using ::arrow::util::SafeLoadAs; using parquet::internal::BinaryRecordReader; using parquet::internal::DictionaryRecordReader; using parquet::internal::RecordReader; +using parquet::internal::ReeRecordReader; using parquet::schema::GroupNode; using parquet::schema::Node; using parquet::schema::PrimitiveNode; @@ -566,6 +567,24 @@ Status TransferDictionary(RecordReader* reader, MemoryPool* pool, return Status::OK(); } +Status TransferRunEndEncoded(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr& logical_value_type, + bool nullable, std::shared_ptr* out) { + auto ree_reader = dynamic_cast(reader); + DCHECK(ree_reader); + std::shared_ptr<::arrow::Array> result = ree_reader->GetResult(); + *out = std::make_shared(result); + if (!logical_value_type->Equals(*(*out)->type())) { + ARROW_ASSIGN_OR_RAISE(*out, ViewOrCastChunkedArray(*out, pool, logical_value_type)); + } + if (!nullable) { + ::arrow::ArrayVector chunks = (*out)->chunks(); + ReconstructChunksWithoutNulls(&chunks); + *out = std::make_shared(std::move(chunks), logical_value_type); + } + return Status::OK(); +} + Status TransferBinary(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& logical_type_field, std::shared_ptr* out) { @@ -574,6 +593,12 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool, reader, pool, ::arrow::dictionary(::arrow::int32(), logical_type_field->type()), logical_type_field->nullable(), out); } + if (reader->read_ree()) { + return TransferRunEndEncoded( + reader, pool, ::arrow::run_end_encoded(::arrow::int32(), logical_type_field->type()), + logical_type_field->nullable(), out); + } + ::arrow::compute::ExecContext ctx(pool); ::arrow::compute::CastOptions cast_options; cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data @@ -864,6 +889,11 @@ Status TransferColumnData(RecordReader* reader, value_field->nullable(), &chunked_result)); result = chunked_result; } break; + case ::arrow::Type::RUN_END_ENCODED: { + RETURN_NOT_OK(TransferRunEndEncoded(reader, pool, value_field->type(), + value_field->nullable(), &chunked_result)); + result = chunked_result; + } break; case ::arrow::Type::NA: { result = std::make_shared<::arrow::NullArray>(reader->values_written()); break; diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 293ae94b94d..42ccc6652dd 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -537,6 +537,11 @@ bool IsDictionaryReadSupported(const ArrowType& type) { return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING; } +bool IsRunEndEncodedReadSupported(const ArrowType& type) { + // Only supported currently for BYTE_ARRAY types + return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING; +} + // ---------------------------------------------------------------------- // Schema logic @@ -548,6 +553,9 @@ ::arrow::Result> GetTypeForNode( if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); + } else if (ctx->properties.read_ree(column_index) && + IsRunEndEncodedReadSupported(*storage_type)) { + return ::arrow::run_end_encoded(::arrow::int32(), storage_type); } return storage_type; } diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 8ecb774022f..a79fd1fc1ab 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -34,6 +34,7 @@ #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" #include "arrow/array/builder_primitive.h" +#include "arrow/array/builder_run_end.h" #include "arrow/chunked_array.h" #include "arrow/type.h" #include "arrow/util/bit_stream_utils_internal.h" @@ -2074,6 +2075,47 @@ class ByteArrayChunkedRecordReader final : public TypedRecordReader::Accumulator accumulator_; }; +class ByteArrayReeRecordReader final : public TypedRecordReader, + virtual public ReeRecordReader { + public: + ByteArrayReeRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool, bool read_dense_for_nullable) + : TypedRecordReader(descr, leaf_info, pool, + read_dense_for_nullable) { + read_ree_ = true; + ARROW_DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + PARQUET_THROW_NOT_OK(::arrow::MakeBuilder( + ::arrow::default_memory_pool(), + ::arrow::run_end_encoded(::arrow::int32(), ::arrow::binary()), &builder_)); + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), + checked_cast<::arrow::RunEndEncodedBuilder*>(builder_.get())); + CheckNumberDecoded(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, + checked_cast<::arrow::RunEndEncodedBuilder*>(builder_.get())); + CheckNumberDecoded(num_decoded, values_to_read - null_count); + ResetValues(); + } + + std::shared_ptr<::arrow::Array> GetResult() override { + std::shared_ptr<::arrow::Array> result; + PARQUET_THROW_NOT_OK(builder_->Finish(&result)); + return result; + } + + private: + std::unique_ptr<::arrow::ArrayBuilder> builder_; +}; + /// ByteArrayDictionaryRecordReader reads into ::arrow::dictionary(index: int32, /// values: binary). /// @@ -2171,11 +2213,14 @@ void TypedRecordReader::DebugPrintState() {} std::shared_ptr MakeByteArrayRecordReader( const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool, - bool read_dictionary, bool read_dense_for_nullable, + bool read_dictionary, bool read_dense_for_nullable, bool read_run_end_encoded, const std::shared_ptr<::arrow::DataType>& arrow_type) { if (read_dictionary) { return std::make_shared(descr, leaf_info, pool, read_dense_for_nullable); + } else if (read_run_end_encoded) { + return std::make_shared( + descr, leaf_info, pool, read_dense_for_nullable); } else { return std::make_shared( descr, leaf_info, pool, read_dense_for_nullable, arrow_type); @@ -2186,7 +2231,7 @@ std::shared_ptr MakeByteArrayRecordReader( std::shared_ptr RecordReader::Make( const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool, - bool read_dictionary, bool read_dense_for_nullable, + bool read_dictionary, bool read_dense_for_nullable, bool read_run_end_encoded, const std::shared_ptr<::arrow::DataType>& arrow_type) { switch (descr->physical_type()) { case Type::BOOLEAN: @@ -2207,10 +2252,9 @@ std::shared_ptr RecordReader::Make( case Type::DOUBLE: return std::make_shared>(descr, leaf_info, pool, read_dense_for_nullable); - case Type::BYTE_ARRAY: { + case Type::BYTE_ARRAY: return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary, - read_dense_for_nullable, arrow_type); - } + read_dense_for_nullable, read_run_end_encoded, arrow_type); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared(descr, leaf_info, pool, read_dense_for_nullable); diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index ac4469b1904..41453d41d04 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -273,6 +273,7 @@ class PARQUET_EXPORT RecordReader { const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), bool read_dictionary = false, bool read_dense_for_nullable = false, + bool read_run_end_encoded = false, const std::shared_ptr<::arrow::DataType>& arrow_type = NULLPTR); virtual ~RecordReader() = default; @@ -373,6 +374,9 @@ class PARQUET_EXPORT RecordReader { /// \brief True if reading directly as Arrow dictionary-encoded bool read_dictionary() const { return read_dictionary_; } + /// \brief True if reading directly as Arrow run-end-encoded + bool read_ree() const { return read_ree_; } + /// \brief True if reading dense for nullable columns. bool read_dense_for_nullable() const { return read_dense_for_nullable_; } @@ -424,6 +428,7 @@ class PARQUET_EXPORT RecordReader { int64_t levels_capacity_; bool read_dictionary_ = false; + bool read_ree_ = false; // If true, we will not leave any space for the null values in the values_ // vector or fill nulls values in BinaryRecordReader/DictionaryRecordReader. // @@ -444,6 +449,13 @@ class DictionaryRecordReader : virtual public RecordReader { virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0; }; +/// \brief Read records directly to run-end-encoded Arrow form (int32 +/// run ends). Only valid for BYTE_ARRAY columns +class ReeRecordReader : virtual public RecordReader { + public: + virtual std::shared_ptr<::arrow::Array> GetResult() = 0; +}; + } // namespace internal using BoolReader = TypedColumnReader; diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc index d0a857dd22a..ae3cbf71f07 100644 --- a/cpp/src/parquet/decoder.cc +++ b/cpp/src/parquet/decoder.cc @@ -33,6 +33,7 @@ #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" #include "arrow/array/builder_primitive.h" +#include "arrow/builder.h" #include "arrow/type_traits.h" #include "arrow/util/bit_block_counter.h" #include "arrow/util/bit_run_reader.h" @@ -272,6 +273,45 @@ void CheckPageLargeEnough(int64_t remaining_bytes, int32_t value_width, } } + +class ReeBuilderHelper { + public: + ReeBuilderHelper(typename EncodingTraits::ReeAccumulator* builder) + : builder_(builder), + arrow_value_dtype_( + std::static_pointer_cast<::arrow::RunEndEncodedType>(builder_->type()) + ->value_type()) {}; + + Status flushCurrStateToBuilder() { + if (curr_val_.has_value()) { + auto str_value = ByteArrayToString(*curr_val_); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar, + ::arrow::MakeScalar(arrow_value_dtype_, str_value)); + RETURN_NOT_OK(builder_->AppendScalar(*scalar, curr_repeats_)); + } else if (curr_repeats_ > 0) { + RETURN_NOT_OK(builder_->AppendNulls(curr_repeats_)); + } + return Status::OK(); + } + + Status update(std::optional val, int num_repeats) { + if (val == curr_val_) { + curr_repeats_ += num_repeats; + } else { + RETURN_NOT_OK(flushCurrStateToBuilder()); + curr_val_ = val; + curr_repeats_ = num_repeats; + } + return Status::OK(); + } + + private: + std::optional curr_val_; + int curr_repeats_ = 0; + typename EncodingTraits::ReeAccumulator* builder_; + std::shared_ptr<::arrow::DataType> arrow_value_dtype_; +}; + // Internal decoder class hierarchy class DecoderImpl : virtual public Decoder { @@ -359,6 +399,10 @@ class PlainDecoder : public TypedDecoderImpl { int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* builder) override; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override; }; template <> @@ -448,6 +492,13 @@ int PlainDecoder::DecodeArrow( return values_decoded; } +template +inline int PlainDecoder::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) { + ParquetException::NYI("run-end-encoded PlainDecoder"); +} + // Decode routine templated on C++ type rather than type enum template inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, @@ -544,6 +595,10 @@ class PlainBooleanDecoder : public TypedDecoderImpl, public Boolean int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* out) override; + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override; + private: std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_; int total_num_values_{0}; @@ -621,6 +676,12 @@ inline int PlainBooleanDecoder::DecodeArrow( ParquetException::NYI("dictionaries of BooleanType"); } +inline int PlainBooleanDecoder::DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) { + ParquetException::NYI("run-end-encoded BooleanType"); +} + int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) { max_values = std::min(max_values, num_values_); if (ARROW_PREDICT_FALSE(!bit_reader_->Advance(max_values))) { @@ -749,6 +810,15 @@ class PlainByteArrayDecoder : public PlainDecoder { return result; } + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, out, &result)); + return result; + } + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -794,6 +864,46 @@ class PlainByteArrayDecoder : public PlainDecoder { visit_binary_helper); } + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out, + int* out_values_decoded) { + ReeBuilderHelper helper(out); + int values_decoded = 0; + int i = 0; + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ParquetException::EofException(); + } + auto value_len = SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ParquetException::EofException(); + } + parquet::ByteArray byte_array(value_len, data_ + 4); + RETURN_NOT_OK(helper.update(byte_array, 1)); + data_ += increment; + len_ -= increment; + ++values_decoded; + ++i; + return Status::OK(); + }, + [&]() { + RETURN_NOT_OK(helper.update(std::nullopt, 1)); + ++i; + return Status::OK(); + })); + RETURN_NOT_OK(helper.flushCurrStateToBuilder()); + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, @@ -909,6 +1019,13 @@ class DictDecoderImpl : public TypedDecoderImpl, public DictDecoder int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* out) override; + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* builder) override { + ParquetException::NYI(); + } + void InsertDictionary(::arrow::ArrayBuilder* builder) override; int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits, @@ -1271,7 +1388,81 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl { return result; } + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* builder) override { + int result = 0; + if (null_count == 0) { + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, builder, &result)); + } else { + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + valid_bits_offset, builder, &result)); + } + return result; + } + private: + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* builder, + int* out_num_values) { + int total_values_decoded = 0; + int non_null_values_decoded = 0; + ReeBuilderHelper helper(builder); + const auto* dict_values = dictionary_->data_as(); + int64_t valid_bits_index = valid_bits_offset; + while (total_values_decoded < num_values) { + int32_t idx; + bool is_null = false; + int num_repeats; + bool ok = idx_decoder_.GetNextValueAndNumRepeatsSpaced(&idx, &is_null, &num_repeats, + num_values - total_values_decoded, + valid_bits, valid_bits_index); + if (ARROW_PREDICT_FALSE(!ok)) { + break; + } + DCHECK_GT(num_repeats, 0); + if (is_null) { + RETURN_NOT_OK(helper.update(std::nullopt, num_repeats)); + } else { + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + RETURN_NOT_OK(helper.update(val, num_repeats)); + non_null_values_decoded += num_repeats; + } + valid_bits_index += num_repeats; + total_values_decoded += num_repeats; + } + RETURN_NOT_OK(helper.flushCurrStateToBuilder()); + *out_num_values = non_null_values_decoded; + return Status::OK(); + } + + Status DecodeArrowDenseNonNull( + int num_values, typename EncodingTraits::ReeAccumulator* builder, + int* out_num_values) { + int values_decoded = 0; + ReeBuilderHelper helper(builder); + const auto* dict_values = dictionary_->data_as(); + while (values_decoded < num_values) { + int32_t idx; + int num_repeats; + bool ok = idx_decoder_.GetNextValueAndNumRepeats(&idx, &num_repeats, num_values - values_decoded); + if (ARROW_PREDICT_FALSE(!ok)) { + break; + } + DCHECK_GT(num_repeats, 0); + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + RETURN_NOT_OK(helper.update(val, num_repeats)); + values_decoded += num_repeats; + } + RETURN_NOT_OK(helper.flushCurrStateToBuilder()); + *out_num_values = values_decoded; + return Status::OK(); + } + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, @@ -1486,6 +1677,12 @@ class DeltaBitPackDecoder : public TypedDecoderImpl { return decoded_count; } + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override { + ParquetException::NYI("Run end encoded DeltaBitPackDecoder"); + } + private: static constexpr int kMaxDeltaBitWidth = static_cast(sizeof(T) * 8); @@ -1728,6 +1925,12 @@ class DeltaLengthByteArrayDecoder : public TypedDecoderImpl { "DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder"); } + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override { + ParquetException::NYI("DecodeArrow of ReeAccumulator for DeltaLengthByteArrayDecoder"); + } + private: // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data // after that. @@ -1901,7 +2104,13 @@ class RleBooleanDecoder : public TypedDecoderImpl, public BooleanDe int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* builder) override { - ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); + ParquetException::NYI("DecodeArrow of DictAccumulator for RleBooleanDecoder"); + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override { + ParquetException::NYI("DecodeArrow of ReeAccumulator for RleBooleanDecoder"); } private: @@ -1973,6 +2182,12 @@ class DeltaByteArrayDecoderImpl : public TypedDecoderImpl { ParquetException::NYI("DecodeArrow of DictAccumulator for DeltaByteArrayDecoder"); } + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override { + ParquetException::NYI("DecodeArrow of ReeAccumulator for DeltaByteArrayDecoder"); + } + protected: template static void BuildBufferInternal(const int32_t* prefix_len_ptr, int i, ByteArray* buffer, @@ -2211,6 +2426,12 @@ class ByteStreamSplitDecoderBase : public TypedDecoderImpl { ParquetException::NYI("DecodeArrow to DictAccumulator for BYTE_STREAM_SPLIT"); } + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) override { + ParquetException::NYI("DecodeArrow to ReeAccumulator for BYTE_STREAM_SPLIT"); + } + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* builder) override { @@ -2241,6 +2462,7 @@ class ByteStreamSplitDecoderBase : public TypedDecoderImpl { return values_to_decode; } + protected: int DecodeRaw(uint8_t* out_buffer, int max_values) { const int values_to_decode = std::min(this->num_values_, max_values); diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index d80bf0edcae..e900c5f11cf 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -70,6 +70,7 @@ struct EncodingTraits { using ArrowType = ::arrow::BooleanType; using Accumulator = ::arrow::BooleanBuilder; struct DictAccumulator {}; + struct ReeAccumulator {}; }; template <> @@ -80,6 +81,7 @@ struct EncodingTraits { using ArrowType = ::arrow::Int32Type; using Accumulator = ::arrow::NumericBuilder<::arrow::Int32Type>; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::Int32Type>; + struct ReeAccumulator {}; }; template <> @@ -90,6 +92,7 @@ struct EncodingTraits { using ArrowType = ::arrow::Int64Type; using Accumulator = ::arrow::NumericBuilder<::arrow::Int64Type>; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::Int64Type>; + struct ReeAccumulator {}; }; template <> @@ -99,6 +102,7 @@ struct EncodingTraits { struct Accumulator {}; struct DictAccumulator {}; + struct ReeAccumulator {}; }; template <> @@ -109,6 +113,7 @@ struct EncodingTraits { using ArrowType = ::arrow::FloatType; using Accumulator = ::arrow::NumericBuilder<::arrow::FloatType>; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::FloatType>; + struct ReeAccumulator {}; }; template <> @@ -119,6 +124,7 @@ struct EncodingTraits { using ArrowType = ::arrow::DoubleType; using Accumulator = ::arrow::NumericBuilder<::arrow::DoubleType>; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::DoubleType>; + struct ReeAccumulator {}; }; template <> @@ -139,6 +145,7 @@ struct EncodingTraits { std::vector> chunks; }; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>; + using ReeAccumulator = ::arrow::RunEndEncodedBuilder; }; template <> @@ -149,6 +156,7 @@ struct EncodingTraits { using ArrowType = ::arrow::FixedSizeBinaryType; using Accumulator = ::arrow::FixedSizeBinaryBuilder; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::FixedSizeBinaryType>; + struct ReeAccumulator {}; }; class ColumnDescriptor; @@ -324,6 +332,11 @@ class TypedDecoder : virtual public Decoder { int64_t valid_bits_offset, typename EncodingTraits::DictAccumulator* builder) = 0; + virtual int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::ReeAccumulator* out) = 0; + /// \brief Decode into a DictionaryBuilder ignoring nulls /// /// \return number of values decoded @@ -331,6 +344,14 @@ class TypedDecoder : virtual public Decoder { typename EncodingTraits::DictAccumulator* builder) { return DecodeArrow(num_values, 0, /*valid_bits=*/NULLPTR, 0, builder); } + + /// \brief Decode into a RunEndEncodedBuilder ignoring nulls + /// + /// \return number of values decoded + int DecodeArrowNonNull(int num_values, + typename EncodingTraits::ReeAccumulator* out) { + return DecodeArrow(num_values, 0, /*valid_bits=*/NULLPTR, 0, out); + } }; template diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index b246feaf732..9b245d11ea3 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -111,7 +111,7 @@ std::shared_ptr RowGroupReader::Column(int i) { } std::shared_ptr RowGroupReader::RecordReader( - int i, bool read_dictionary) { + int i, bool read_dictionary, bool read_ree) { if (i >= metadata()->num_columns()) { std::stringstream ss; ss << "Trying to read column index " << i << " but row group metadata has only " @@ -126,7 +126,7 @@ std::shared_ptr RowGroupReader::RecordReader( auto reader = internal::RecordReader::Make( descr, level_info, contents_->properties()->memory_pool(), read_dictionary, - contents_->properties()->read_dense_for_nullable()); + contents_->properties()->read_dense_for_nullable(), read_ree); reader->SetPageReader(std::move(page_reader)); return reader; } @@ -149,7 +149,8 @@ std::shared_ptr RowGroupReader::RecordReaderWithExposeEn return RecordReader( i, /*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY && - IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))); + IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i)), + /*read_ree=*/encoding_to_expose == ExposedEncoding::REE); } std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index c42163276cd..2da052a196e 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -65,7 +65,8 @@ class PARQUET_EXPORT RowGroupReader { // EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group. // Ownership is shared with the RowGroupReader. std::shared_ptr RecordReader(int i, - bool read_dictionary = false); + bool read_dictionary = false, + bool read_ree = false); // Construct a ColumnReader, trying to enable exposed encoding. // @@ -83,7 +84,7 @@ class PARQUET_EXPORT RowGroupReader { // Construct a RecordReader, trying to enable exposed encoding. // - // For dictionary encoding, currently we only support column chunks that are + // For dictionary and run-end encoding, currently we only support column chunks that are // fully dictionary encoded byte arrays. The caller should verify if the reader can read // and expose the dictionary by checking the reader's read_dictionary(). If a column // chunk uses dictionary encoding but then falls back to plain encoding, the returned diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5a1799c39d7..618bd2f3633 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1038,6 +1038,25 @@ class PARQUET_EXPORT ArrowReaderProperties { } } + /// \brief Set whether to read a particular column as run-end-encoded. + /// + /// This is only supported for columns with a Parquet physical type of + /// BYTE_ARRAY, such as string or binary types. + void set_read_ree(int column_index, bool read_ree) { + if (read_ree) { + read_ree_indices_.insert(column_index); + } else { + read_ree_indices_.erase(column_index); + } + } + bool read_ree(int column_index) const { + if (read_ree_indices_.find(column_index) != read_ree_indices_.end()) { + return true; + } else { + return false; + } + } + /// \brief Set the Arrow binary type to read BYTE_ARRAY columns as. /// /// Allowed values are Type::BINARY, Type::LARGE_BINARY and Type::BINARY_VIEW. @@ -1148,6 +1167,7 @@ class PARQUET_EXPORT ArrowReaderProperties { private: bool use_threads_; std::unordered_set read_dict_indices_; + std::unordered_set read_ree_indices_; int64_t batch_size_; bool pre_buffer_; ::arrow::io::IOContext io_context_; diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 7ae9021e35e..34ec8517258 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -37,6 +37,8 @@ #include "arrow/array/array_binary.h" #include "arrow/array/builder_binary.h" #include "arrow/buffer.h" +#include "arrow/compute/api_vector.h" +#include "arrow/datum.h" #include "arrow/io/file.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" @@ -712,6 +714,74 @@ TEST(TestFileReader, RecordReaderWithExposingDictionary) { } } +TEST(TestFileReader, RecordReaderWithExposingRee) { + const int num_rows = 1000; + + // Make schema + schema::NodeVector fields; + fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, Type::BYTE_ARRAY, + ConvertedType::NONE)); + auto schema = std::static_pointer_cast( + GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // Write small batches and small data pages + std::shared_ptr writer_props = WriterProperties::Builder() + .write_batch_size(64) + ->data_pagesize(128) + ->enable_dictionary() + ->build(); + + ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); + std::shared_ptr file_writer = + ParquetFileWriter::Open(out_file, schema, writer_props); + + RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // write one column + ::arrow::random::RandomArrayGenerator rag(0); + ByteArrayWriter* writer = static_cast(rg_writer->NextColumn()); + std::vector raw_unique_data = {"a", "bc", "defg"}; + std::vector col_typed; + for (int i = 0; i < num_rows; i++) { + std::string_view chosed_data = raw_unique_data[i % raw_unique_data.size()]; + col_typed.emplace_back(chosed_data); + } + writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.data()); + rg_writer->Close(); + file_writer->Close(); + + // Open the reader + ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); + auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf); + + ReaderProperties reader_props; + reader_props.enable_buffered_stream(); + reader_props.set_buffer_size(64); + std::unique_ptr file_reader = + ParquetFileReader::Open(in_file, reader_props); + + auto row_group = file_reader->RowGroup(0); + auto record_reader = std::dynamic_pointer_cast( + row_group->RecordReaderWithExposeEncoding(0, ExposedEncoding::REE)); + ASSERT_NE(record_reader, nullptr); + ASSERT_TRUE(record_reader->read_ree()); + + ASSERT_EQ(record_reader->ReadRecords(num_rows), num_rows); + std::shared_ptr<::arrow::Array> ree_result_array = record_reader->GetResult(); + ASSERT_OK_AND_ASSIGN(::arrow::Datum decoded_datum, + ::arrow::compute::RunEndDecode(::arrow::Datum(ree_result_array))); + auto decoded_array = decoded_datum.make_array(); + auto decoded_binary_array = std::static_pointer_cast<::arrow::BinaryArray>(decoded_array); + + // Verify values using RunEndDecode + int64_t indices_read = decoded_binary_array->length(); + ASSERT_EQ(indices_read, num_rows); + for (int i = 0; i < indices_read; ++i) { + std::string_view ith_value_view = decoded_binary_array->GetView(i); + ASSERT_EQ(ith_value_view, col_typed[i]); + } +} + class TestLocalFile : public ::testing::Test { public: void SetUp() { diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 7e8a18fc94d..5ab973a301f 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -550,7 +550,8 @@ struct Encoding { // decoding, in which case the data read from the file is DICTIONARY encoded. enum class ExposedEncoding { NO_ENCODING = 0, // data is not encoded, i.e. already decoded during reading - DICTIONARY = 1 + DICTIONARY = 1, + REE = 2 }; /// \brief Return true if Parquet supports indicated compression type diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 9405b5d8c54..61bb6124533 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -142,6 +142,9 @@ cdef class ParquetFileFormat(FileFormat): if read_options.dictionary_columns is not None: for column in read_options.dictionary_columns: options.dict_columns.insert(tobytes(column)) + if read_options.ree_columns is not None: + for column in read_options.ree_columns: + options.ree_columns.insert(tobytes(column)) options.coerce_int96_timestamp_unit = \ read_options._coerce_int96_timestamp_unit options.binary_type = read_options._binary_type @@ -181,6 +184,7 @@ cdef class ParquetFileFormat(FileFormat): parquet_read_options = ParquetReadOptions( dictionary_columns={frombytes(col) for col in options.dict_columns}, + ree_columns={frombytes(col) for col in options.ree_columns}, ) # Read options getter/setter works with strings so setting # the private property which uses the C Type @@ -527,15 +531,18 @@ cdef class ParquetReadOptions(_Weakrefable): cdef public: set dictionary_columns + set ree_columns TimeUnit _coerce_int96_timestamp_unit Type _binary_type Type _list_type # Also see _PARQUET_READ_OPTIONS def __init__(self, dictionary_columns=None, + ree_columns=None, coerce_int96_timestamp_unit=None, binary_type=None, list_type=None): self.dictionary_columns = set(dictionary_columns or set()) + self.ree_columns = set(ree_columns or set()) self.coerce_int96_timestamp_unit = coerce_int96_timestamp_unit self.binary_type = binary_type self.list_type = list_type @@ -585,6 +592,7 @@ cdef class ParquetReadOptions(_Weakrefable): bool """ return (self.dictionary_columns == other.dictionary_columns and + self.ree_columns == other.ree_columns and self._coerce_int96_timestamp_unit == other._coerce_int96_timestamp_unit and self._binary_type == other._binary_type and @@ -600,6 +608,7 @@ cdef class ParquetReadOptions(_Weakrefable): return ( f"