Skip to content

Commit 2580c08

Browse files
mwbayleyrodireich
andcommitted
source-postgres: field type mapping (#65983)
Co-authored-by: Rodi Reich Zilberman <[email protected]>
1 parent f616dae commit 2580c08

File tree

33 files changed

+1058
-292
lines changed

33 files changed

+1058
-292
lines changed

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/data/JsonCodec.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,18 +336,23 @@ data class ArrayEncoder<T>(
336336
override fun encode(decoded: List<T>): JsonNode =
337337
Jsons.arrayNode().apply {
338338
for (e in decoded) {
339-
add(elementEncoder.encode(e))
339+
// Note: in generics, T can be nullable!
340+
if (e == null) add(NullCodec.encode(e)) else add(elementEncoder.encode(e))
340341
}
341342
}
342343
}
343344

344345
data class ArrayDecoder<T>(
345346
val elementDecoder: JsonDecoder<T>,
346-
) : JsonDecoder<List<T>> {
347-
override fun decode(encoded: JsonNode): List<T> {
347+
) : JsonDecoder<List<T?>> {
348+
override fun decode(encoded: JsonNode): List<T?> {
348349
if (!encoded.isArray) {
349350
throw IllegalArgumentException("invalid array value $encoded")
350351
}
351-
return encoded.elements().asSequence().map { elementDecoder.decode(it) }.toList()
352+
return encoded
353+
.elements()
354+
.asSequence()
355+
.map { if (it == null) null else elementDecoder.decode(it) }
356+
.toList()
352357
}
353358
}

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/Field.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ sealed interface DataOrMetaField {
1919

2020
@Deprecated(
2121
message = "Use `DataOrMetaField` directly instead.",
22-
replaceWith = ReplaceWith("DataOrMetaField"))
22+
replaceWith = ReplaceWith("DataOrMetaField")
23+
)
2324
typealias FieldOrMetaField = DataOrMetaField
2425
/**
2526
* Root of our own type hierarchy for Airbyte record fields.
@@ -43,11 +44,12 @@ interface LosslessFieldType : FieldType {
4344
val jsonDecoder: JsonDecoder<*>
4445
}
4546

46-
interface DataField: DataOrMetaField
47+
interface DataField : DataOrMetaField
4748

4849
@Deprecated(
4950
message = "Use `EmittedField` directly instead.",
50-
replaceWith = ReplaceWith("EmittedField"))
51+
replaceWith = ReplaceWith("EmittedField")
52+
)
5153
typealias Field = EmittedField
5254
/**
5355
* Internal equivalent of [io.airbyte.protocol.models.Field] for values which come from the source

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/NativeRecord.kt

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ fun <T> JsonEncoder<T>.toProtobufEncoder(): ProtoEncoder<*> {
8181
is LocalTimeCodec, -> localTimeProtoEncoder
8282
is LocalDateTimeCodec, -> localDateTimeProtoEncoder
8383
is OffsetTimeCodec, -> offsetTimeProtoEncoder
84-
is ArrayEncoder<*>, -> anyProtoEncoder
84+
is ArrayEncoder<*>, -> arrayProtoEncoder
8585
else -> anyProtoEncoder
8686
}
8787
}
@@ -162,33 +162,35 @@ val floatProtoEncoder =
162162

163163
val nullProtoEncoder = generateProtoEncoder<Any?> { builder, _ -> builder.setIsNull(true) }
164164
val anyProtoEncoder = textProtoEncoder
165-
// typealias AnyProtoEncoder = TextProtoEncoder
165+
166+
// For now arrays are encoded in protobuf as json strings
167+
val arrayProtoEncoder = textProtoEncoder
166168

167169
fun NativeRecordPayload.toProtobuf(
168170
schema: Set<FieldOrMetaField>,
169171
recordMessageBuilder: AirbyteRecordMessageProtobuf.Builder,
170172
valueBuilder: AirbyteRecordMessage.AirbyteValueProtobuf.Builder
171173
): AirbyteRecordMessageProtobuf.Builder {
172174
return recordMessageBuilder.apply {
173-
schema
174-
.sortedBy { it.id }
175-
.forEachIndexed { index, field ->
176-
// Protobuf does not have field names, so we use a sorted order of fields
177-
// So for destination to know which fields it is, we order the fields alphabetically
178-
// to make sure that the order is consistent.
179-
this@toProtobuf[field.id]?.let { value ->
180-
@Suppress("UNCHECKED_CAST")
181-
setData(
182-
index,
183-
value.fieldValue?.let {
184-
(value.jsonEncoder.toProtobufEncoder() as ProtoEncoder<Any>).encode(
185-
valueBuilder.clear(),
186-
value.fieldValue
187-
)
175+
// We use toSortedMap() to ensure that the order is consistent
176+
// Since protobuf has no field name the contract with destination is that
177+
// field are alphabetically ordered.
178+
this@toProtobuf.toSortedMap().onEachIndexed { index, entry ->
179+
@Suppress("UNCHECKED_CAST")
180+
setData(
181+
index,
182+
entry.value.fieldValue?.let {
183+
(entry.value.jsonEncoder.toProtobufEncoder() as ProtoEncoder<Any>).encode(
184+
valueBuilder.clear(),
185+
when (entry.value.jsonEncoder) {
186+
// For arrays we use the value of its json string.
187+
is ArrayEncoder<*> -> entry.value.encode().toString()
188+
else -> entry.value.fieldValue!!
188189
}
189-
?: nullProtoEncoder.encode(valueBuilder.clear(), null)
190+
190191
)
191192
}
192-
}
193+
)
194+
}
193195
}
194196
}

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
package io.airbyte.cdk.read
33

44
import io.airbyte.cdk.StreamIdentifier
5-
import io.airbyte.cdk.discover.Field
65
import io.airbyte.cdk.discover.DataOrMetaField
6+
import io.airbyte.cdk.discover.Field
77

88
/**
99
* [Feed] identifies part of the data consumed during a READ operation.

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import io.airbyte.cdk.command.StreamInputState
1414
import io.airbyte.cdk.data.AirbyteSchemaType
1515
import io.airbyte.cdk.data.ArrayAirbyteSchemaType
1616
import io.airbyte.cdk.data.LeafAirbyteSchemaType
17-
import io.airbyte.cdk.discover.Field
1817
import io.airbyte.cdk.discover.DataOrMetaField
18+
import io.airbyte.cdk.discover.Field
1919
import io.airbyte.cdk.discover.MetaField
2020
import io.airbyte.cdk.discover.MetaFieldDecorator
2121
import io.airbyte.cdk.discover.MetadataQuerier

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class JdbcMetadataQuerier(
130130
}
131131
}
132132
val clause = if (constants.includePseudoColumns) " and pseudo-column(s)" else ""
133-
log.info { "Discovered ${results.size} column(s)${clause}."}
133+
log.info { "Discovered ${results.size} column(s)${clause}." }
134134
} catch (e: Exception) {
135135
throw RuntimeException("Column name discovery query failed: ${e.message}", e)
136136
}

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartition.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.read
66

77
import com.fasterxml.jackson.databind.JsonNode
8-
import com.fasterxml.jackson.databind.node.ObjectNode
98
import io.airbyte.cdk.command.OpaqueStateValue
109
import io.airbyte.cdk.discover.Field
1110
import io.airbyte.cdk.output.sockets.toJson
@@ -197,7 +196,8 @@ class DefaultJdbcSplittableSnapshotPartition(
197196
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
198197
DefaultJdbcStreamStateValue.snapshotCheckpoint(
199198
primaryKey = checkpointColumns,
200-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
199+
primaryKeyCheckpoint =
200+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
201201
)
202202
}
203203

@@ -264,7 +264,8 @@ class DefaultJdbcSplittableSnapshotWithCursorPartition(
264264
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
265265
DefaultJdbcStreamStateValue.snapshotWithCursorCheckpoint(
266266
primaryKey = checkpointColumns,
267-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
267+
primaryKeyCheckpoint =
268+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
268269
cursor,
269270
cursorUpperBound,
270271
)

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactory.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import io.airbyte.cdk.ConfigErrorException
99
import io.airbyte.cdk.StreamIdentifier
1010
import io.airbyte.cdk.command.JdbcSourceConfiguration
1111
import io.airbyte.cdk.command.OpaqueStateValue
12-
import io.airbyte.cdk.discover.Field
1312
import io.airbyte.cdk.discover.DataOrMetaField
13+
import io.airbyte.cdk.discover.Field
1414
import io.airbyte.cdk.output.CatalogValidationFailureHandler
1515
import io.airbyte.cdk.output.InvalidCursor
1616
import io.airbyte.cdk.output.InvalidPrimaryKey

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartition.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.cdk.read
66

7-
import com.fasterxml.jackson.databind.node.ObjectNode
87
import io.airbyte.cdk.command.OpaqueStateValue
98

109
/**
@@ -46,7 +45,6 @@ interface JdbcSplittablePartition<S : JdbcStreamState<*>> : JdbcPartition<S> {
4645

4746
/** State value to emit when the partition is read up to (and including) [lastRecord]. */
4847
fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue
49-
5048
}
5149

5250
/** A [JdbcPartition] which allows cursor-based incremental reads. */

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartitionReader.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.cdk.read
33

4-
import com.fasterxml.jackson.databind.node.ObjectNode
54
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
65
import io.airbyte.cdk.TransientErrorException
76
import io.airbyte.cdk.command.OpaqueStateValue
87
import io.airbyte.cdk.discover.Field
98
import io.airbyte.cdk.output.DataChannelMedium.*
109
import io.airbyte.cdk.output.OutputMessageRouter
1110
import io.airbyte.cdk.output.sockets.NativeRecordPayload
12-
import io.airbyte.cdk.output.sockets.toJson
13-
import io.airbyte.cdk.util.Jsons
1411
import io.airbyte.protocol.models.v0.AirbyteStateMessage
1512
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
1613
import java.time.Duration
@@ -196,7 +193,7 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
196193
.use { result: SelectQuerier.Result ->
197194
for (row in result) {
198195
out(row)
199-
// lastRecord.set(row.data.toJson(Jsons.objectNode()))
196+
// lastRecord.set(row.data.toJson(Jsons.objectNode()))
200197
lastRecord.set(row)
201198
// Check activity periodically to handle timeout.
202199
if (numRecords.incrementAndGet() % fetchSize == 0L) {

0 commit comments

Comments
 (0)