Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ Status UnpackDictionary(KernelContext* ctx, const ExecSpan& batch, ExecResult* o
return Status::OK();
}

Status DecodeRunEndEncoded(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
Datum input(batch[0].array.ToArrayData());
ARROW_ASSIGN_OR_RAISE(Datum decoded, RunEndDecode(input, ctx->exec_context()));
out->value = std::move(decoded.array());
return Status::OK();
}

Status OutputAllNull(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
// TODO(wesm): there is no good reason to have to use ArrayData here, so we
// should clean this up later. This is used in the dict<null>->null cast
Expand Down Expand Up @@ -286,6 +293,12 @@ static bool CanCastFromDictionary(Type::type type_id) {
is_fixed_size_binary(type_id));
}

static bool CanCastFromRunEndEncoded(Type::type type_id) {
return (is_primitive(type_id) ||
is_base_binary_like(type_id) ||
is_fixed_size_binary(type_id));
}

void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* func) {
// From null to this type
ScalarKernel kernel;
Expand All @@ -303,6 +316,13 @@ void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* fun
MemAllocation::NO_PREALLOCATE));
}

// From run-end-encoded to this type
if (CanCastFromRunEndEncoded(out_type_id)) {
DCHECK_OK(func->AddKernel(Type::RUN_END_ENCODED, {InputType(Type::RUN_END_ENCODED)}, out_ty,
DecodeRunEndEncoded, NullHandling::COMPUTED_NO_PREALLOCATE,
MemAllocation::NO_PREALLOCATE));
}

// From extension type to this type
DCHECK_OK(func->AddKernel(Type::EXTENSION, {InputType(Type::EXTENSION)}, out_ty,
CastFromExtension, NullHandling::COMPUTED_NO_PREALLOCATE,
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
auto column_index = metadata.schema()->ColumnIndex(name);
properties.set_read_dictionary(column_index, true);
}
for (const std::string& name : format.reader_options.ree_columns) {
auto column_index = metadata.schema()->ColumnIndex(name);
properties.set_read_ree(column_index, true);
}
properties.set_coerce_int96_timestamp_unit(
format.reader_options.coerce_int96_timestamp_unit);
properties.set_binary_type(format.reader_options.binary_type);
Expand Down Expand Up @@ -445,6 +449,7 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const {

// FIXME implement comparison for decryption options
return (reader_options.dict_columns == other_reader_options.dict_columns &&
reader_options.ree_columns == other_reader_options.ree_columns &&
reader_options.coerce_int96_timestamp_unit ==
other_reader_options.coerce_int96_timestamp_unit &&
reader_options.binary_type == other_reader_options.binary_type &&
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
///
/// @{
std::unordered_set<std::string> dict_columns;
std::unordered_set<std::string> ree_columns;
arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
Type::type binary_type = Type::BINARY;
Type::type list_type = Type::LIST;
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ TEST_F(TestParquetFileFormat, InspectDictEncoded) {
AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false);
}

TEST_F(TestParquetFileFormat, InspectReeEncoded) {
auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
auto source = GetFileSource(reader.get());

format_->reader_options.ree_columns = {"utf8"};
ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get()));

Schema expected_schema({field("utf8", run_end_encoded(int32(), utf8()))});
AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false);
}

TEST_F(TestParquetFileFormat, IsSupported) { TestIsSupported(); }

TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { TestWrite(); }
Expand Down Expand Up @@ -617,6 +628,25 @@ TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderDictEncoded) {
}
ASSERT_EQ(row_count, expected_rows());
}
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderReeEncoded) {
auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
auto source = GetFileSource(reader.get());

SetSchema(reader->schema()->fields());
SetFilter(literal(true));
format_->reader_options.ree_columns = {"utf8"};
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

int64_t row_count = 0;
Schema expected_schema({field("utf8", run_end_encoded(int32(), utf8()))});

for (auto maybe_batch : PhysicalBatches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
AssertSchemaEqual(*batch->schema(), expected_schema, /* check_metadata = */ false);
}
ASSERT_EQ(row_count, expected_rows());
}
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderPreBuffer) {
auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
auto source = GetFileSource(reader.get());
Expand Down
72 changes: 71 additions & 1 deletion cpp/src/arrow/util/rle_encoding_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ class RleBitPackedDecoder {
/// values.
[[nodiscard]] bool Get(value_type* val);

// Get the next logical value and num_repeats within the specified batch_size.
[[nodiscard]] bool GetNextValueAndNumRepeats(value_type* val, int* num_repeats, int batch_size);

/// Like GetNextValueAndNumRepeats but add spacing for null entries.
[[nodiscard]] bool GetNextValueAndNumRepeatsSpaced(value_type* val, bool* is_null,
int* num_repeats, int batch_size,
const uint8_t* valid_bits, int64_t valid_bits_offset);

/// Get a batch of values return the number of decoded elements.
/// May write fewer elements to the output than requested if there are not enough values
/// left or if an error occurred.
Expand Down Expand Up @@ -722,7 +730,6 @@ void RleBitPackedDecoder<T>::ParseWithCallable(Callable&& func) {
auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); }
auto OnRleRun(RleRun run) { return func(std::move(run)); }
} handler{std::move(func)};

parser_.Parse(std::move(handler));
}

Expand All @@ -731,6 +738,69 @@ bool RleBitPackedDecoder<T>::Get(value_type* val) {
return GetBatch(val, 1) == 1;
}

template <typename T>
bool RleBitPackedDecoder<T>::GetNextValueAndNumRepeats(value_type* val, int* num_repeats, int batch_size) {
using ControlFlow = RleBitPackedParser::ControlFlow;

if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
if (std::holds_alternative<BitPackedRunDecoder<value_type>>(decoder_)) {
auto& decoder = std::get<BitPackedRunDecoder<value_type>>(decoder_);
*num_repeats = 1;
return decoder.Get(val, value_bit_width_);
} else {
auto& decoder = std::get<RleRunDecoder<value_type>>(decoder_);
*num_repeats = std::min(decoder.remaining(), batch_size);
ARROW_DCHECK_EQ(decoder.Advance(*num_repeats, value_bit_width_), *num_repeats);
return decoder.Get(val, value_bit_width_);
}
}

bool read_new_value = false;

ParseWithCallable([&](auto run) {
if constexpr(std::is_same_v<decltype(run), BitPackedRun>) {
BitPackedRunDecoder<T> decoder(run, value_bit_width_);
read_new_value = decoder.Get(val, value_bit_width_);
*num_repeats = 1;
decoder_ = std::move(decoder);
return ControlFlow::Break;
}
else {
RleRunDecoder<T> decoder(run, value_bit_width_);
*num_repeats = std::min(decoder.remaining(), batch_size);
read_new_value = decoder.Get(val, value_bit_width_);
ARROW_DCHECK_EQ(decoder.Advance(*num_repeats, value_bit_width_), *num_repeats);
decoder_ = std::move(decoder);
return ControlFlow::Break;
}
});

return read_new_value;
}

template <typename T>
bool RleBitPackedDecoder<T>::GetNextValueAndNumRepeatsSpaced(value_type* val, bool* is_null,
int* num_repeats, int batch_size,
const uint8_t* valid_bits, int64_t valid_bits_offset) {
arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset,
/*length=*/batch_size);
arrow::internal::BitRun valid_run = bit_reader.NextRun();
while (ARROW_PREDICT_FALSE(valid_run.length == 0)) {
valid_run = bit_reader.NextRun();
}
ARROW_DCHECK_GT(batch_size, 0);
ARROW_DCHECK_GT(valid_run.length, 0);
if (valid_run.set) {
return GetNextValueAndNumRepeats(
val, num_repeats,
static_cast<int>(std::min(valid_run.length, static_cast<int64_t>(batch_size))));
} else {
*is_null = true;
*num_repeats = static_cast<int>(valid_run.length);
}
return true;
}

template <typename T>
auto RleBitPackedDecoder<T>::GetBatch(value_type* out, rle_size_t batch_size)
-> rle_size_t {
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,10 @@ add_parquet_test(arrow-reader-writer-test
SOURCES
arrow/arrow_reader_writer_test.cc
arrow/arrow_statistics_test.cc
arrow/variant_test.cc)
arrow/variant_test.cc
EXTRA_LINK_LIBS
arrow_compute_core_testing
arrow_acero_testing)

add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
arrow/reconstruct_internal_test.cc)
Expand Down
90 changes: 90 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "arrow/array/builder_primitive.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/api.h"
#include "arrow/compute/initialize.h"
#include "arrow/extension/json.h"
#include "arrow/io/api.h"
#include "arrow/record_batch.h"
Expand All @@ -61,6 +62,8 @@
# include "arrow/csv/api.h"
#endif

#include "arrow/acero/test_util_internal.h"

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"

Expand Down Expand Up @@ -4884,6 +4887,7 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {

std::shared_ptr<Table> actual;
ASSERT_OK_NO_THROW(reader->ReadTable(&actual));

::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
}

Expand Down Expand Up @@ -5029,6 +5033,92 @@ INSTANTIATE_TEST_SUITE_P(
ReadDictionary, TestArrowReadDictionary,
::testing::ValuesIn(TestArrowReadDictionary::null_probabilities()));

class TestArrowReadRunEndEncoded : public ::testing::TestWithParam<double> {
public:
static constexpr int kNumRowGroups = 16;

struct {
int num_rows = 1024 * kNumRowGroups;
int num_row_groups = kNumRowGroups;
int num_uniques = 128;
} options;

void SetUp() override {
ASSERT_OK(::arrow::compute::Initialize());
properties_ = default_arrow_reader_properties();

GenerateData(GetParam());
}

void GenerateData(double null_probability) {
constexpr int64_t min_length = 2;
constexpr int64_t max_length = 100;
::arrow::random::RandomArrayGenerator rag(0);
dense_values_ = rag.StringWithRepeats(options.num_rows, options.num_uniques,
min_length, max_length, null_probability);
expected_dense_ = MakeSimpleTable(dense_values_, /*nullable=*/true);
}

void TearDown() override {}

void WriteSimple() {
// Write `num_row_groups` row groups; each row group will have a different dictionary
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(expected_dense_, options.num_rows / options.num_row_groups,
default_arrow_writer_properties(), &buffer_));
}

void CheckReadWholeFile(const Table& expected) {
ASSERT_OK_AND_ASSIGN(auto reader, GetReader());

std::shared_ptr<Table> actual;
ASSERT_OK_NO_THROW(reader->ReadTable(&actual));
::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
}

static std::vector<double> null_probabilities() { return {0.0, 0.5, 1}; }

protected:
std::shared_ptr<Array> dense_values_;
std::shared_ptr<Table> expected_dense_;
std::shared_ptr<Table> expected_dict_;
std::shared_ptr<Buffer> buffer_;
ArrowReaderProperties properties_;

::arrow::Result<std::unique_ptr<FileReader>> GetReader() {
std::unique_ptr<FileReader> reader;

FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(std::make_shared<BufferReader>(buffer_)));
RETURN_NOT_OK(builder.properties(properties_)->Build(&reader));

return reader;
}
};

TEST_P(TestArrowReadRunEndEncoded, ReadWholeFile) {
properties_.set_read_ree(0, true);

WriteSimple();

auto num_row_groups = options.num_row_groups;
auto chunk_size = options.num_rows / num_row_groups;

std::vector<std::shared_ptr<Array>> chunks(num_row_groups);
for (int i = 0; i < num_row_groups; ++i) {
chunks[i] = dense_values_->Slice(chunk_size * i, chunk_size);
}
auto ex_table = MakeSimpleTable(std::make_shared<ChunkedArray>(chunks),
/*nullable=*/true);
ASSERT_OK_AND_ASSIGN(ex_table, ::arrow::acero::RunEndEncodeTableColumns(*ex_table, {0}));

CheckReadWholeFile(*ex_table);
}

INSTANTIATE_TEST_SUITE_P(
ReadRunEndEncoded, TestArrowReadRunEndEncoded,
::testing::ValuesIn(TestArrowReadRunEndEncoded::null_probabilities()));

TEST(TestArrowWriteDictionaries, ChangingDictionaries) {
constexpr int num_unique = 50;
constexpr int repeat = 10000;
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ class LeafReader : public ColumnReaderImpl {
record_reader_ = RecordReader::Make(
descr_, leaf_info, ctx_->pool,
/*read_dictionary=*/field_->type()->id() == ::arrow::Type::DICTIONARY,
/*read_dense_for_nullable=*/false, /*arrow_type=*/type_for_reading);
/*read_dense_for_nullable=*/false,
/*read_run_end_encoded=*/field_->type()->id() == ::arrow::Type::RUN_END_ENCODED,
/*arrow_type=*/type_for_reading);
NextRowGroup();
}

Expand Down
Loading
Loading