Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions crates/arroyo-formats/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, Sc
use arroyo_rpc::{MetadataField, TIMESTAMP_FIELD};
use arroyo_types::{to_nanos, SourceError, LOOKUP_KEY_INDEX_FIELD};
use prost_reflect::DescriptorPool;
use serde::Deserialize;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -257,6 +258,7 @@ pub struct ArrowDeserializer {
additional_fields_builder: Option<HashMap<String, Box<dyn ArrayBuilder>>>,
timestamp_builder: Option<(usize, TimestampNanosecondBuilder)>,
buffer_decoder: BufferDecoder,
buffered_incomplete: Vec<u8>,
}

impl ArrowDeserializer {
Expand Down Expand Up @@ -394,6 +396,7 @@ impl ArrowDeserializer {
schema_resolver,
proto_pool,
additional_fields_builder: None,
buffered_incomplete: Vec::new(),
}
}

Expand All @@ -418,6 +421,52 @@ impl ArrowDeserializer {
.await
}

/// Reinitializes the `buffer_decoder` based on the configured input format.
///
/// This method is used when the decoder needs to be reset — for example, after encountering
/// irrecoverable decode errors from part-way buffered, invalid messages. It rebuilds the internal
/// [`BufferDecoder`] variant appropriate to the `Format`:
///
/// - For structured `Json`, `Avro`, or `Protobuf` formats (i.e., not unstructured),
/// it constructs a new `JsonDecoder` using the current `decoder_schema`.
/// - For all other formats (e.g., `RawString`, `RawBytes`, or unstructured inputs),
/// it falls back to a `ContextBuffer`-based decoder.
///
/// Additionally, it clears the `buffered_incomplete` buffer, which holds any partial
/// or incomplete messages from previous decoding attempts.
///
/// # Panics
///
/// If `arrow_json::reader::ReaderBuilder::build_decoder()` panics from an invalid or unsupported schema
/// If `ContextBuffer::new(...)` panics due to an unsupported data type in the schema.
pub fn reset_buffer_decoder(&mut self) {
self.buffer_decoder = match self.format.as_ref() {
Format::Json(JsonFormat {
unstructured: false,
..
})
| Format::Avro(AvroFormat {
into_unstructured_json: false,
..
})
| Format::Protobuf(ProtobufFormat {
into_unstructured_json: false,
..
}) => BufferDecoder::JsonDecoder {
decoder: arrow_json::reader::ReaderBuilder::new(self.decoder_schema.clone())
.with_limit_to_batch_size(false)
.with_strict_mode(false)
.with_allow_bad_data(matches!(self.bad_data, BadData::Drop { .. }))
.build_decoder()
.unwrap(),
buffered_count: 0,
buffered_since: Instant::now(),
},
_ => BufferDecoder::Buffer(ContextBuffer::new(self.decoder_schema.clone())),
};
self.buffered_incomplete.clear();
}

pub fn deserialize_null(
&mut self,
additional_fields: Option<&HashMap<&str, FieldValueType<'_>>>,
Expand Down Expand Up @@ -500,7 +549,11 @@ impl ArrowDeserializer {

pub fn flush_buffer(&mut self) -> Option<Result<RecordBatch, SourceError>> {
let (arrays, error_mask) = match self.buffer_decoder.flush(&self.bad_data)? {
Ok((a, b)) => (a, b),
Ok((a, b)) => {
// All latest incomplete messages buffered need to be cleared when flush succeeds
self.buffered_incomplete.clear();
(a, b)
}
Err(e) => return Some(Err(e)),
};

Expand Down Expand Up @@ -561,7 +614,52 @@ impl ArrowDeserializer {
msg
};

self.buffer_decoder.decode_json(msg)?;
// No previously incomplete buffered data present in the tape decoder
if self.buffered_incomplete.is_empty() {
match is_complete_json(msg) {
Ok(true) => {
self.buffer_decoder.decode_json(msg)?;
}
Ok(false) => {
self.buffer_decoder.decode_json(msg)?;
// Begin buffering incomplete data to keep track
self.buffered_incomplete.extend_from_slice(msg);
}
Err(e) => {
// Malformed JSON - don't send to decoder and return error
return Err(SourceError::bad_data(format!("invalid JSON: {e:?}")));
}
}
// We have previously buffered incomplete data - combine with current message
} else {
let combined_msg: Vec<u8> = self
.buffered_incomplete
.iter()
.chain(msg)
.copied()
.collect();

match is_complete_json(&combined_msg) {
Ok(true) => {
// Combined message is now complete - decode and clear buffer
self.buffer_decoder.decode_json(msg)?;
self.buffered_incomplete.clear();
}
Ok(false) => {
// Still incomplete but valid - decode and keep buffering
self.buffer_decoder.decode_json(msg)?;
self.buffered_incomplete.extend_from_slice(msg);
}
Err(e) => {
// Combined message is invalid - reset buffer and return error
// All previously complete JSON messages not flushed will be lost
self.reset_buffer_decoder();
return Err(SourceError::bad_data(format!(
"resetting buffer poisoned with incomplete and invalid JSON: {e:?}"
)));
}
}
}
}
Format::Protobuf(proto) => {
let json = proto::de::deserialize_proto(&mut self.proto_pool, proto, msg)?;
Expand Down Expand Up @@ -692,6 +790,37 @@ macro_rules! append_repeated_value {
}};
}

/// Checks whether the given byte slice contains a complete and valid JSON value.
///
/// This function attempts to deserialize the input as a `serde_json::Value`.
/// It returns:
///
/// - `Ok(true)` if the input is complete and valid JSON
/// - `Ok(false)` if the input is incomplete (i.e., ends prematurely)
/// - `Err(e)` if the input is syntactically invalid JSON
///
/// This is particularly useful for determining whether partial data (e.g., a streaming buffer)
/// has accumulated enough bytes to be parsed as a full JSON object.
///
/// # Arguments
///
/// * `input` – A byte slice that may contain a JSON value
///
/// # Errors
///
/// Returns a `serde_json::Error` if the input is malformed JSON (but not just incomplete).
fn is_complete_json(input: &[u8]) -> Result<bool, serde_json::Error> {
let mut de = serde_json::Deserializer::from_slice(input);
match Value::deserialize(&mut de) {
Ok(_) => {
de.end()?;
Ok(true)
}
Err(e) if e.is_eof() => Ok(false),
Err(e) => Err(e),
}
}

fn add_additional_fields(
builders: &mut HashMap<String, Box<dyn ArrayBuilder>>,
key: &str,
Expand Down