Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 184 additions & 15 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,35 @@ fn write_offset(buf: &mut Vec<u8>, value: usize, nbytes: u8) {
buf.extend_from_slice(&bytes[..nbytes as usize]);
}

#[derive(Default)]
/// Wrapper around a `Vec<u8>` that provides methods for appending
/// primitive values, variant types, and metadata.
///
/// This is used internally by the builders to construct the
/// the `value` field for [`Variant`] values.
///
/// You can reuse an existing `Vec<u8>` by using the `from` impl
#[derive(Debug, Default)]
struct ValueBuffer(Vec<u8>);

impl ValueBuffer {
/// Construct a ValueBuffer that will write to a new underlying `Vec`
fn new() -> Self {
Default::default()
}
}

impl From<Vec<u8>> for ValueBuffer {
fn from(value: Vec<u8>) -> Self {
Self(value)
}
}

impl From<ValueBuffer> for Vec<u8> {
fn from(value_buffer: ValueBuffer) -> Self {
value_buffer.0
}
}

impl ValueBuffer {
fn append_u8(&mut self, term: u8) {
self.0.push(term);
Expand All @@ -82,7 +108,7 @@ impl ValueBuffer {
}

fn into_inner(self) -> Vec<u8> {
self.0
self.into()
}

fn inner_mut(&mut self) -> &mut Vec<u8> {
Expand Down Expand Up @@ -252,13 +278,31 @@ impl ValueBuffer {
}
}

#[derive(Default)]
/// Builder for constructing metadata for [`Variant`] values.
///
/// This is used internally by the [`VariantBuilder`] to construct the metadata
///
/// You can use an existing `Vec<u8>` as the metadata buffer by using the `from` impl.
#[derive(Default, Debug)]
struct MetadataBuilder {
// Field names -- field_ids are assigned in insert order
field_names: IndexSet<String>,

// flag that checks if field names by insertion order are also lexicographically sorted
is_sorted: bool,

/// Output buffer. Metadata is written to the end of this buffer
metadata_buffer: Vec<u8>,
}

/// Create a new MetadataBuilder that will write to the specified metadata buffer
impl From<Vec<u8>> for MetadataBuilder {
fn from(metadata_buffer: Vec<u8>) -> Self {
Self {
metadata_buffer,
..Default::default()
}
}
}

impl MetadataBuilder {
Expand Down Expand Up @@ -307,6 +351,12 @@ impl MetadataBuilder {
// Calculate metadata size
let total_dict_size: usize = self.metadata_size();

let Self {
field_names,
is_sorted,
mut metadata_buffer,
} = self;

// Determine appropriate offset size based on the larger of dict size or total string size
let max_offset = std::cmp::max(total_dict_size, nkeys);
let offset_size = int_size(max_offset);
Expand All @@ -315,29 +365,29 @@ impl MetadataBuilder {
let string_start = offset_start + (nkeys + 1) * offset_size as usize;
let metadata_size = string_start + total_dict_size;

let mut metadata = Vec::with_capacity(metadata_size);
metadata_buffer.reserve(metadata_size);

// Write header: version=1, field names are sorted, with calculated offset_size
metadata.push(0x01 | (self.is_sorted as u8) << 4 | ((offset_size - 1) << 6));
metadata_buffer.push(0x01 | (is_sorted as u8) << 4 | ((offset_size - 1) << 6));

// Write dictionary size
write_offset(&mut metadata, nkeys, offset_size);
write_offset(&mut metadata_buffer, nkeys, offset_size);

// Write offsets
let mut cur_offset = 0;
for key in self.field_names.iter() {
write_offset(&mut metadata, cur_offset, offset_size);
for key in field_names.iter() {
write_offset(&mut metadata_buffer, cur_offset, offset_size);
cur_offset += key.len();
}
// Write final offset
write_offset(&mut metadata, cur_offset, offset_size);
write_offset(&mut metadata_buffer, cur_offset, offset_size);

// Write string data
for key in self.field_names {
metadata.extend_from_slice(key.as_bytes());
for key in field_names {
metadata_buffer.extend_from_slice(key.as_bytes());
}

metadata
metadata_buffer
}
}

Expand Down Expand Up @@ -570,6 +620,41 @@ impl ParentState<'_> {
/// );
///
/// ```
/// # Example: Reusing Buffers
///
/// You can use the [`VariantBuilder`] to write into existing buffers (for
/// example to write multiple variants back to back in the same buffer)
///
/// ```
/// // we will write two variants back to back
/// use parquet_variant::{Variant, VariantBuilder};
/// // Append 12345
/// let mut builder = VariantBuilder::new();
/// builder.append_value(12345);
/// let (metadata, value) = builder.finish();
/// // remember where the first variant ends
/// let (first_meta_offset, first_meta_len) = (0, metadata.len());
/// let (first_value_offset, first_value_len) = (0, value.len());
///
/// // now, append a second variant to the same buffers
/// let mut builder = VariantBuilder::new_with_buffers(metadata, value);
/// builder.append_value("Foo");
/// let (metadata, value) = builder.finish();
///
/// // The variants can be referenced in their appropriate location
/// let variant1 = Variant::new(
/// &metadata[first_meta_offset..first_meta_len],
/// &value[first_value_offset..first_value_len]
/// );
/// assert_eq!(variant1, Variant::Int32(12345));
///
/// let variant2 = Variant::new(
/// &metadata[first_meta_len..],
/// &value[first_value_len..]
/// );
/// assert_eq!(variant2, Variant::from("Foo"));
/// ```
///
/// # Example: Unique Field Validation
///
/// This example shows how enabling unique field validation will cause an error
Expand Down Expand Up @@ -626,23 +711,33 @@ impl ParentState<'_> {
/// let (metadata, value) = builder.finish();
/// let variant = Variant::try_new(&metadata, &value).unwrap();
/// ```
///
#[derive(Default)]
#[derive(Default, Debug)]
pub struct VariantBuilder {
buffer: ValueBuffer,
metadata_builder: MetadataBuilder,
validate_unique_fields: bool,
}

impl VariantBuilder {
/// Create a new VariantBuilder with new underlying buffer
pub fn new() -> Self {
Self {
buffer: ValueBuffer::default(),
buffer: ValueBuffer::new(),
metadata_builder: MetadataBuilder::default(),
validate_unique_fields: false,
}
}

/// Create a new VariantBuilder that will write the metadata and values to
/// the specified buffers.
pub fn new_with_buffers(metadata_buffer: Vec<u8>, value_buffer: Vec<u8>) -> Self {
Self {
buffer: ValueBuffer::from(value_buffer),
metadata_builder: MetadataBuilder::from(metadata_buffer),
validate_unique_fields: false,
}
}

/// Enables validation of unique field keys in nested objects.
///
/// This setting is propagated to all [`ObjectBuilder`]s created through this [`VariantBuilder`]
Expand Down Expand Up @@ -1916,6 +2011,80 @@ mod tests {
assert_eq!(metadata.num_field_names(), 3);
}

/// Test reusing buffers with nested objects
#[test]
fn test_with_existing_buffers_nested() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really cool test case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have loved to append the variant but I hit this bug:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you apply this diff, it should give you what you need: #7914.

I don't love that we need to use a builder to reencode the object/list. I'm going to spend some time thinking about a more elegant solution.

I wonder if this PR with its existing buffers can help 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you apply this diff, it should give you what you need: #7914.

Thank you @friendlymatthew -- that looks great ❤️ -- I left a few comments

I don't love that we need to use a builder to reencode the object/list. I'm going to spend some time thinking about a more elegant solution.

I think potentially a more performant solution would be to optimize #7914 to copy the Variant bytes directly and then update the field_ids.

However, that will get tricky I think if the field_ids in the variant under construction are different sizes than the old one.

I suggest we start with the simple thing (what you have) and then we can optimize if/when we have benchmarks that show it would help

let mut builder = VariantBuilder::new();
append_test_list(&mut builder);
let (m1, v1) = builder.finish();
let variant1 = Variant::new(&m1, &v1);

let mut builder = VariantBuilder::new();
append_test_object(&mut builder);
let (m2, v2) = builder.finish();
let variant2 = Variant::new(&m2, &v2);

let mut builder = VariantBuilder::new();
builder.append_value("This is a string");
let (m3, v3) = builder.finish();
let variant3 = Variant::new(&m3, &v3);

// Now, append those three variants to the a new buffer that is reused
let mut builder = VariantBuilder::new();
append_test_list(&mut builder);
let (metadata, value) = builder.finish();
let (meta1_offset, meta1_end) = (0, metadata.len());
let (value1_offset, value1_end) = (0, value.len());

// reuse same buffer
let mut builder = VariantBuilder::new_with_buffers(metadata, value);
append_test_object(&mut builder);
let (metadata, value) = builder.finish();
let (meta2_offset, meta2_end) = (meta1_end, metadata.len());
let (value2_offset, value2_end) = (value1_end, value.len());

// Append a string
let mut builder = VariantBuilder::new_with_buffers(metadata, value);
builder.append_value("This is a string");
let (metadata, value) = builder.finish();
let (meta3_offset, meta3_end) = (meta2_end, metadata.len());
let (value3_offset, value3_end) = (value2_end, value.len());

// verify we can read the variants back correctly
let roundtrip1 = Variant::new(
&metadata[meta1_offset..meta1_end],
&value[value1_offset..value1_end],
);
assert_eq!(roundtrip1, variant1,);

let roundtrip2 = Variant::new(
&metadata[meta2_offset..meta2_end],
&value[value2_offset..value2_end],
);
assert_eq!(roundtrip2, variant2,);

let roundtrip3 = Variant::new(
&metadata[meta3_offset..meta3_end],
&value[value3_offset..value3_end],
);
assert_eq!(roundtrip3, variant3);
}

/// append a simple List variant
fn append_test_list(builder: &mut VariantBuilder) {
let mut list = builder.new_list();
list.append_value(1234);
list.append_value("a string value");
list.finish();
}

/// append an object variant
fn append_test_object(builder: &mut VariantBuilder) {
let mut obj = builder.new_object();
obj.insert("a", true);
obj.finish().unwrap();
}

#[test]
fn test_variant_builder_to_list_builder_no_finish() {
// Create a list builder but never finish it
Expand Down
Loading