Skip to content

Commit a3b829a

Browse files
committed
array encoding to protobuf
1 parent 3e28b50 commit a3b829a

File tree

1 file changed

+10
-3
lines changed
  • airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output

1 file changed

+10
-3
lines changed

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/NativeRecord.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ fun <T> JsonEncoder<T>.toProtobufEncoder(): ProtoEncoder<*> {
8282
is LocalTimeCodec, -> localTimeProtoEncoder
8383
is LocalDateTimeCodec, -> localDateTimeProtoEncoder
8484
is OffsetTimeCodec, -> offsetTimeProtoEncoder
85-
is ArrayEncoder<*>, -> anyProtoEncoder
85+
is ArrayEncoder<*>, -> arrayProtoEncoder
8686
else -> anyProtoEncoder
8787
}
8888
}
@@ -160,7 +160,9 @@ val floatProtoEncoder =
160160

161161
val nullProtoEncoder = generateProtoEncoder<Any?> { builder, _ -> builder.setIsNull(true) }
162162
val anyProtoEncoder = textProtoEncoder
163-
// typealias AnyProtoEncoder = TextProtoEncoder
163+
164+
// For now arrays are encoded in protobuf as json strings
165+
val arrayProtoEncoder = textProtoEncoder
164166

165167
fun NativeRecordPayload.toProtobuf(
166168
recordMessageBuilder: AirbyteRecordMessageProtobuf.Builder,
@@ -177,7 +179,12 @@ fun NativeRecordPayload.toProtobuf(
177179
entry.value.fieldValue?.let {
178180
(entry.value.jsonEncoder.toProtobufEncoder() as ProtoEncoder<Any>).encode(
179181
valueBuilder.clear(),
180-
entry.value.fieldValue!!
182+
when (entry.value.jsonEncoder) {
183+
// For arrays we use the value of its json string.
184+
is ArrayEncoder<*> -> this@toProtobuf.toJson().asText()
185+
else -> entry.value.fieldValue!!
186+
}
187+
181188
)
182189
}
183190
?: nullProtoEncoder.encode(valueBuilder.clear(), null)

0 commit comments

Comments
 (0)