diff --git a/crates/arroyo-formats/src/de.rs b/crates/arroyo-formats/src/de.rs index 5633e89a5..576fc6746 100644 --- a/crates/arroyo-formats/src/de.rs +++ b/crates/arroyo-formats/src/de.rs @@ -18,6 +18,7 @@ use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, Sc use arroyo_rpc::{MetadataField, TIMESTAMP_FIELD}; use arroyo_types::{to_nanos, SourceError, LOOKUP_KEY_INDEX_FIELD}; use prost_reflect::DescriptorPool; +use serde::Deserialize; use serde_json::Value; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -257,6 +258,7 @@ pub struct ArrowDeserializer { additional_fields_builder: Option>>, timestamp_builder: Option<(usize, TimestampNanosecondBuilder)>, buffer_decoder: BufferDecoder, + buffered_incomplete: Vec, } impl ArrowDeserializer { @@ -394,6 +396,7 @@ impl ArrowDeserializer { schema_resolver, proto_pool, additional_fields_builder: None, + buffered_incomplete: Vec::new(), } } @@ -418,6 +421,52 @@ impl ArrowDeserializer { .await } + /// Reinitializes the `buffer_decoder` based on the configured input format. + /// + /// This method is used when the decoder needs to be reset — for example, after encountering + /// irrecoverable decode errors from part-way buffered, invalid messages. It rebuilds the internal + /// [`BufferDecoder`] variant appropriate to the `Format`: + /// + /// - For structured `Json`, `Avro`, or `Protobuf` formats (i.e., not unstructured), + /// it constructs a new `JsonDecoder` using the current `decoder_schema`. + /// - For all other formats (e.g., `RawString`, `RawBytes`, or unstructured inputs), + /// it falls back to a `ContextBuffer`-based decoder. + /// + /// Additionally, it clears the `buffered_incomplete` buffer, which holds any partial + /// or incomplete messages from previous decoding attempts. + /// + /// # Panics + /// + /// If `arrow_json::reader::ReaderBuilder::build_decoder()` panics from an invalid or unsupported schema + /// If `ContextBuffer::new(...)` panics due to an unsupported data type in the schema. + pub fn reset_buffer_decoder(&mut self) { + self.buffer_decoder = match self.format.as_ref() { + Format::Json(JsonFormat { + unstructured: false, + .. + }) + | Format::Avro(AvroFormat { + into_unstructured_json: false, + .. + }) + | Format::Protobuf(ProtobufFormat { + into_unstructured_json: false, + .. + }) => BufferDecoder::JsonDecoder { + decoder: arrow_json::reader::ReaderBuilder::new(self.decoder_schema.clone()) + .with_limit_to_batch_size(false) + .with_strict_mode(false) + .with_allow_bad_data(matches!(self.bad_data, BadData::Drop { .. })) + .build_decoder() + .unwrap(), + buffered_count: 0, + buffered_since: Instant::now(), + }, + _ => BufferDecoder::Buffer(ContextBuffer::new(self.decoder_schema.clone())), + }; + self.buffered_incomplete.clear(); + } + pub fn deserialize_null( &mut self, additional_fields: Option<&HashMap<&str, FieldValueType<'_>>>, @@ -500,7 +549,11 @@ impl ArrowDeserializer { pub fn flush_buffer(&mut self) -> Option> { let (arrays, error_mask) = match self.buffer_decoder.flush(&self.bad_data)? { - Ok((a, b)) => (a, b), + Ok((a, b)) => { + // All latest incomplete messages buffered need to be cleared when flush succeeds + self.buffered_incomplete.clear(); + (a, b) + } Err(e) => return Some(Err(e)), }; @@ -561,7 +614,52 @@ impl ArrowDeserializer { msg }; - self.buffer_decoder.decode_json(msg)?; + // No previously incomplete buffered data present in the tape decoder + if self.buffered_incomplete.is_empty() { + match is_complete_json(msg) { + Ok(true) => { + self.buffer_decoder.decode_json(msg)?; + } + Ok(false) => { + self.buffer_decoder.decode_json(msg)?; + // Begin buffering incomplete data to keep track + self.buffered_incomplete.extend_from_slice(msg); + } + Err(e) => { + // Malformed JSON - don't send to decoder and return error + return Err(SourceError::bad_data(format!("invalid JSON: {e:?}"))); + } + } + // We have previously buffered incomplete data - combine with current message + } else { + let combined_msg: Vec = self + .buffered_incomplete + .iter() + .chain(msg) + .copied() + .collect(); + + match is_complete_json(&combined_msg) { + Ok(true) => { + // Combined message is now complete - decode and clear buffer + self.buffer_decoder.decode_json(msg)?; + self.buffered_incomplete.clear(); + } + Ok(false) => { + // Still incomplete but valid - decode and keep buffering + self.buffer_decoder.decode_json(msg)?; + self.buffered_incomplete.extend_from_slice(msg); + } + Err(e) => { + // Combined message is invalid - reset buffer and return error + // All previously complete JSON messages not flushed will be lost + self.reset_buffer_decoder(); + return Err(SourceError::bad_data(format!( + "resetting buffer poisoned with incomplete and invalid JSON: {e:?}" + ))); + } + } + } } Format::Protobuf(proto) => { let json = proto::de::deserialize_proto(&mut self.proto_pool, proto, msg)?; @@ -692,6 +790,37 @@ macro_rules! append_repeated_value { }}; } +/// Checks whether the given byte slice contains a complete and valid JSON value. +/// +/// This function attempts to deserialize the input as a `serde_json::Value`. +/// It returns: +/// +/// - `Ok(true)` if the input is complete and valid JSON +/// - `Ok(false)` if the input is incomplete (i.e., ends prematurely) +/// - `Err(e)` if the input is syntactically invalid JSON +/// +/// This is particularly useful for determining whether partial data (e.g., a streaming buffer) +/// has accumulated enough bytes to be parsed as a full JSON object. +/// +/// # Arguments +/// +/// * `input` – A byte slice that may contain a JSON value +/// +/// # Errors +/// +/// Returns a `serde_json::Error` if the input is malformed JSON (but not just incomplete). +fn is_complete_json(input: &[u8]) -> Result { + let mut de = serde_json::Deserializer::from_slice(input); + match Value::deserialize(&mut de) { + Ok(_) => { + de.end()?; + Ok(true) + } + Err(e) if e.is_eof() => Ok(false), + Err(e) => Err(e), + } +} + fn add_additional_fields( builders: &mut HashMap>, key: &str,