Skip to content
Merged
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ members = [
"arrow-string",
"parquet",
"parquet-variant",
"parquet-variant-compute",
"parquet-variant-json",
"parquet_derive",
"parquet_derive_test",
Expand Down Expand Up @@ -103,6 +104,7 @@ parquet = { version = "55.2.0", path = "./parquet", default-features = false }
# These crates have not yet been released and thus do not use the workspace version
parquet-variant = { version = "0.1.0", path = "./parquet-variant"}
parquet-variant-json = { version = "0.1.0", path = "./parquet-variant-json" }
parquet-variant-compute = { version = "0.1.0", path = "./parquet-variant-json" }

chrono = { version = "0.4.40", default-features = false, features = ["clock"] }

Expand Down
44 changes: 44 additions & 0 deletions parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "parquet-variant-compute"
# This package is still in development and thus the version does
# not follow the versions of the rest of the crates in this repo.
version = "0.1.0"
license = { workspace = true }
description = "Apache Parquet Variant Batch Processing"
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
keywords = ["arrow", "parquet", "variant"]
edition = { workspace = true }
# parquet-variant needs newer version than workspace
rust-version = "1.83"


[dependencies]
arrow = { workspace = true }
arrow-schema = { workspace = true }
parquet-variant = { workspace = true }
parquet-variant-json = { workspace = true }

[lib]
name = "parquet_variant_compute"
bench = false

[dev-dependencies]
181 changes: 181 additions & 0 deletions parquet-variant-compute/src/from_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Module for transforming a batch of JSON strings into a batch of Variants represented as
//! STRUCT<metadata: BINARY, value: BINARY>

use std::sync::Arc;

use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray};
use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field};
use arrow_schema::ArrowError;
use parquet_variant::VariantBuilder;
use parquet_variant_json::json_to_variant;

fn variant_arrow_repr() -> DataType {
// The subfields are expected to be non-nullable according to the parquet variant spec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shredding spec makes value optional, with the possibility -- but not the requirement -- to have a typed_value instead when value is missing. Should we recognize that situation and throw a suitable "shredded variants not supported" error, instead of blowing up with an obscure schema mismatch error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... but now that I think about it, that would be an issue when reading from parquet, which has nothing to do with this PR.

let metadata_field = Field::new("metadata", DataType::Binary, false);
let value_field = Field::new("value", DataType::Binary, false);
let fields = vec![metadata_field, value_field];
DataType::Struct(fields.into())
}

/// Parse a batch of JSON strings into a batch of Variants represented as
/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input
/// must be valid.
pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely add some docs / example to this kernel

I also might suggest calling it cast_to_variant but that is more of a personal preference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_json is a name we could consider for this. It's the name of the function that does this (json string to variant) in spark/databricks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine we'll eventually want a top-level cast_to_variant that converts strong types to variant? And passing a string array to such a function should produce a variant column full of string (or short-string) variant values?

This method here is json-parsing strings and casting the result to variant, not casting strings directly to variant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with @scovich. As for parse_json, it makes sense to name it that way if it is a well defined SQL function. However, from a library point-of-view, I think parse_json is too vague.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well technically, it would be parquet_variant_compute::parse_json, which is a little clearer? Maybe we should add some nested module structure like arrow-compute does, so we get e.g. parquet_variant_compute::variant::parse_json. People would use that as variant::parse_json or parse_json if they prefer parsimony?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can call it cast_json_to_variant and cast_variant_to_json 🤔

let input_string_array = match input.as_any().downcast_ref::<StringArray>() {
Some(string_array) => Ok(string_array),
None => Err(ArrowError::CastError(
"Expected reference to StringArray as input".into(),
)),
}?;
Comment on lines +42 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ok_or_else?

Suggested change
let input_string_array = match input.as_any().downcast_ref::<StringArray>() {
Some(string_array) => Ok(string_array),
None => Err(ArrowError::CastError(
"Expected reference to StringArray as input".into(),
)),
}?;
let input_string_array = input
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ArrowError::CastError("Expected a StringArray as input".into()))?;


// Zero-copy builders
let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 128 scale factor come from? It seems a stretch to assume that a batch of N JSON strings will produce 128*N bytes worth of unique field names? Especially as N grows and that scale factor becomes more and more dangerous?

If we really feel it's important to pre-allocate capacity, I'd recommend capping it at e.g. 1MB. But honestly, I'd just allocate a normal vec and let it grow normally, unless/until we have some proof that the guaranteed O(n) cost to append n bytes to a vec isn't good enough, and that pre-allocation actually helps in a wide variety of situations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should allocate the normal vec and let it grow -- if we want to pre-allocate I think we should allow the user to pass in the allocation size as an argument

Maybe something like

let variant_array = JsonToVariant::new()
  .with_capacity(...)
  .parse(input_array)

let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1);
let mut metadata_validity = BooleanBufferBuilder::new(input.len());
let mut metadata_current_offset: i32 = 0;
metadata_offsets.push(metadata_current_offset);

let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above... I'm not sure how helpful it really is to pre-allocate capacity based only on input length. Some variants will be very small, and an unbounded over-allocation could hog a lot of memory. Others will be much (much) larger than 128B each, and the vec will anyway end up making multiple capacity increases along the way.

let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1);
let mut value_validity = BooleanBufferBuilder::new(input.len());
let mut value_current_offset: i32 = 0;
value_offsets.push(value_current_offset);

let mut validity = BooleanBufferBuilder::new(input.len());
for i in 0..input.len() {
if input.is_null(i) {
// The subfields are expected to be non-nullable according to the parquet variant spec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already know they're non-nullable... maybe the comment can explain that we're pushing valid-but-empty subfields, to maintain proper positioning?

Also: Could we create nullable sub-field arrays even tho the schema says they're non-nullable, and rely on nested null masks? Does that save space in case the variant column has a lot of null entries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess we still have to push something to the offset arrays, to maintain proper positioning... so valid-but-empty is probably the best we can do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess we still have to push something to the offset arrays, to maintain proper positioning... so valid-but-empty is probably the best we can do?

This offset layout is required by the Arrow spec for StringArrays

metadata_validity.append(true);
value_validity.append(true);
metadata_offsets.push(metadata_current_offset);
value_offsets.push(value_current_offset);
validity.append(false);
} else {
let mut vb = VariantBuilder::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will pattern will cause the variant values to be copied twice -- once into the builder's buffers and then once into the output binary builder, which is probably ok for the first version;

With some care I think we will be able to avoid copying the values, though it will take using the lower level APIs (and building offsets directly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so just to be clear the two copies you are referring to are metadata_builder.append_value(&metadata); and metadata_builder.finish() right? If so, I'll take care of it in this PR itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I followed that one -- maybe it's a second issue?

I was just referring to all the try_new_with_metadata calls. Today VariantMetadata is Copy, so it's easy to forget that each such call is quietly copying an ~80 byte object. That cost could perhaps add up when iterating over hundreds/thousands of child objects? Or maybe the compiler is really good at making it ~free, since really only one meaningfully exists at a time?

Copy link
Contributor Author

@harshmotw-db harshmotw-db Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, would this resolve the issue you're talking about:

pub fn try_new(metadata: &'m [u8], value: &'v [u8]) -> Result<Self, ArrowError> {
      // let metadata = VariantMetadata::try_new(metadata)?;
      Self::try_new_with_metadata(VariantMetadata::try_new(metadata)?, value)
  }

Edit: Oh no never mind. There are more copies downstream. I suppose that is more of a library issue that can potentially be fixed separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reduced copying in this function by manually constructing binary buffers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above would definitely not help, because each VariantMetadata::try_new would re-validate the same byte buffer!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so just to be clear the two copies you are referring to are metadata_builder.append_value(&metadata); and metadata_builder.finish() right? If so, I'll take care of it in this PR itself.

What I was really imagining was updating VariantBuilder so it could take a pre-existing buffer (Vec) and append to it, rather than writing into a new buffer and then copying that into the output bytes.

json_to_variant(input_string_array.value(i), &mut vb)?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add not a happy-path test, for example, with invalid json, so this function should fail and return some well-known message to a user?

let (metadata, value) = vb.finish();
validity.append(true);

metadata_current_offset += metadata.len() as i32;
metadata_buffer.extend(metadata);
metadata_offsets.push(metadata_current_offset);
metadata_validity.append(true);

value_current_offset += value.len() as i32;
value_buffer.extend(value);
value_offsets.push(value_current_offset);
value_validity.append(true);
}
}
let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets));
let metadata_data_buffer = Buffer::from_vec(metadata_buffer);
let metadata_null_buffer = NullBuffer::new(metadata_validity.finish());

let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets));
let value_data_buffer = Buffer::from_vec(value_buffer);
let value_null_buffer = NullBuffer::new(value_validity.finish());

let metadata_array = BinaryArray::new(
metadata_offsets_buffer,
metadata_data_buffer,
Some(metadata_null_buffer),
);
let value_array = BinaryArray::new(
value_offsets_buffer,
value_data_buffer,
Some(value_null_buffer),
);

let struct_fields: Vec<ArrayRef> = vec![Arc::new(metadata_array), Arc::new(value_array)];
let variant_fields = match variant_arrow_repr() {
DataType::Struct(fields) => fields,
_ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"),
};
Comment on lines +108 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should variant_arrow_repr just return Fields instead of DataType? Then we don't have to unpack it.

It's also a 4-line method with a single call site (3 lines if we don't convert it to a DataType, so we might also consider removing the method entirely? Could also make the relationship with the StructArray more explicit by something like:

Suggested change
let variant_fields = match variant_arrow_repr() {
DataType::Struct(fields) => fields,
_ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"),
};
let metadata_field = Field::new("metadata", metadata_array.data_type(), false);
let value_field = Field::new("value", value_array.data_type(), false);
let variant_fields = vec![metadata_field, value_field].into();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should variant_arrow_repr just return Fields instead of DataType? Then we don't have to unpack it.

Yes, please

let null_buffer = NullBuffer::new(validity.finish());
Ok(StructArray::new(
variant_fields,
struct_fields,
Some(null_buffer),
))
}

#[cfg(test)]
mod test {
use crate::batch_json_string_to_variant;
use arrow::array::{Array, ArrayRef, BinaryArray, StringArray};
use arrow_schema::ArrowError;
use parquet_variant::{Variant, VariantBuilder};
use std::sync::Arc;

#[test]
fn test_batch_json_string_to_variant() -> Result<(), ArrowError> {
let input = StringArray::from(vec![
Some("1"),
None,
Some("{\"a\": 32}"),
Some("null"),
None,
]);
Comment on lines +130 to +136

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please cover more cases here?
I mean all the types that JSON supports. I see you already added int, nulls, and simple dict here with string and int.

If it's test for happy-path can we please add:

  • default values (e.g, 0 for int because some engines can represent NULL as a default value).
  • booleans (both true/false)
  • more nested json, not only 1-level nested json, e.g. "{{{{true: false}, "-1": "+1"}, 0: 1}}"
  • some long strings to ensure we don't have any string size-based logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default values

Not quite sure what you mean by "default values" or how an engine's NULL handling relates to string (json) -> variant parsing?

"{{{{true: false}, "-1": "+1"}, 0: 1}}"

I'm pretty sure JSON objects requires string field names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gotocoding-DB The batch functions in this PR just run some underlying scalar functions on a whole batch of data. The underlying scalar functions have been validated on all sorts of inputs (this PR). I don't think the logical breadth of JSON test cases needs to be tested again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree full coverage here is redundant -- maybe we can add a comment that says "full json parsing coverage is handled by the tests for json_to_variant"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshmotw-db You're right. If the internally used parsing function is tested, we don't need to duplicate the tests here. I'm personally using tests as another example of function usage (if it's not fully covered in function docs).

let array_ref: ArrayRef = Arc::new(input);
let output = batch_json_string_to_variant(&array_ref).unwrap();

let struct_array = &output;
let metadata_array = struct_array
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let value_array = struct_array
.column(1)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();

assert!(!struct_array.is_null(0));
assert!(struct_array.is_null(1));
assert!(!struct_array.is_null(2));
assert!(!struct_array.is_null(3));
assert!(struct_array.is_null(4));

assert_eq!(metadata_array.value(0), &[1, 0, 0]);
assert_eq!(value_array.value(0), &[12, 1]);

{
let mut vb = VariantBuilder::new();
let mut ob = vb.new_object();
ob.insert("a", Variant::Int8(32));
ob.finish()?;
let (object_metadata, object_value) = vb.finish();
assert_eq!(metadata_array.value(2), &object_metadata);
assert_eq!(value_array.value(2), &object_value);
}

assert_eq!(metadata_array.value(3), &[1, 0, 0]);
assert_eq!(value_array.value(3), &[0]);

// Ensure that the subfields are not actually nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is a useful thing to enforce? I'm pretty sure the arrow spec forbids to make any assumptions about the values of child arrays at positions an ancestor has marked invalid?

assert!(!metadata_array.is_null(1));
assert!(!value_array.is_null(1));
assert!(!metadata_array.is_null(4));
assert!(!value_array.is_null(4));
Ok(())
}
}
22 changes: 22 additions & 0 deletions parquet-variant-compute/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod from_json;
mod to_json;

pub use from_json::batch_json_string_to_variant;
pub use to_json::batch_variant_to_json_string;
Loading
Loading