Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import io.airbyte.cdk.data.OffsetDateTimeCodec
import java.time.OffsetDateTime

/** Internal equivalent of a [io.airbyte.protocol.models.Field]. */
sealed interface FieldOrMetaField {
sealed interface DataOrMetaField {
val id: String
val type: FieldType
}

@Deprecated(
message = "Use `DataOrMetaField` directly instead.",
replaceWith = ReplaceWith("DataOrMetaField"))
typealias FieldOrMetaField = DataOrMetaField
/**
* Root of our own type hierarchy for Airbyte record fields.
*
Expand All @@ -39,20 +43,31 @@ interface LosslessFieldType : FieldType {
val jsonDecoder: JsonDecoder<*>
}

interface DataField: DataOrMetaField

@Deprecated(
message = "Use `EmittedField` directly instead.",
replaceWith = ReplaceWith("EmittedField"))
typealias Field = EmittedField
/**
* Internal equivalent of [io.airbyte.protocol.models.Field] for values which come from the source
* itself, instead of being generated by the connector during its operation.
*/
data class Field(
data class EmittedField(
override val id: String,
override val type: FieldType,
) : DataField

data class NonEmittedField(
override val id: String,
override val type: FieldType,
) : FieldOrMetaField
) : DataField

/**
* Internal equivalent of [io.airbyte.protocol.models.Field] for values which are generated by the
* connector itself during its operation, instead of coming from the source.
*/
interface MetaField : FieldOrMetaField {
interface MetaField : DataOrMetaField {
companion object {
const val META_PREFIX = "_ab_"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.time.OffsetDateTime
interface MetaFieldDecorator {

/** [MetaField] to use as a global cursor, if applicable. */
val globalCursor: FieldOrMetaField?
val globalCursor: DataOrMetaField?

/**
* All [MetaField]s to be found in [Global] stream records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fun <T> JsonEncoder<T>.toProtobufEncoder(): ProtoEncoder<*> {
is LocalTimeCodec, -> localTimeProtoEncoder
is LocalDateTimeCodec, -> localDateTimeProtoEncoder
is OffsetTimeCodec, -> offsetTimeProtoEncoder
is ArrayEncoder<*>, -> anyProtoEncoder
is ArrayEncoder<*>, -> arrayProtoEncoder
else -> anyProtoEncoder
}
}
Expand Down Expand Up @@ -160,7 +160,9 @@ val floatProtoEncoder =

val nullProtoEncoder = generateProtoEncoder<Any?> { builder, _ -> builder.setIsNull(true) }
val anyProtoEncoder = textProtoEncoder
// typealias AnyProtoEncoder = TextProtoEncoder

// For now arrays are encoded in protobuf as json strings
val arrayProtoEncoder = textProtoEncoder

fun NativeRecordPayload.toProtobuf(
recordMessageBuilder: AirbyteRecordMessageProtobuf.Builder,
Expand All @@ -177,7 +179,12 @@ fun NativeRecordPayload.toProtobuf(
entry.value.fieldValue?.let {
(entry.value.jsonEncoder.toProtobufEncoder() as ProtoEncoder<Any>).encode(
valueBuilder.clear(),
entry.value.fieldValue!!
when (entry.value.jsonEncoder) {
// For arrays we use the value of its json string.
is ArrayEncoder<*> -> [email protected]().asText()
else -> entry.value.fieldValue!!
}

)
}
?: nullProtoEncoder.encode(valueBuilder.clear(), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.airbyte.cdk.read

import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.discover.DataOrMetaField

/**
* [Feed] identifies part of the data consumed during a READ operation.
Expand All @@ -30,10 +30,10 @@ data class Global(
*/
data class Stream(
val id: StreamIdentifier,
val schema: Set<FieldOrMetaField>,
val schema: Set<DataOrMetaField>,
val configuredSyncMode: ConfiguredSyncMode,
val configuredPrimaryKey: List<Field>?,
val configuredCursor: FieldOrMetaField?,
val configuredCursor: DataOrMetaField?,
) : Feed {
val name: String
get() = id.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.ArrayAirbyteSchemaType
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.discover.DataOrMetaField
import io.airbyte.cdk.discover.MetaField
import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.discover.MetadataQuerier
Expand Down Expand Up @@ -206,7 +206,7 @@ class StateManagerFactory(
return pk
}

fun cursorOrNull(cursorColumnIDComponents: List<String>): FieldOrMetaField? {
fun cursorOrNull(cursorColumnIDComponents: List<String>): DataOrMetaField? {
if (cursorColumnIDComponents.isEmpty()) {
return null
}
Expand All @@ -218,7 +218,7 @@ class StateManagerFactory(
}
val configuredPrimaryKey: List<Field>? =
configuredStream.primaryKey?.asSequence()?.let { pkOrNull(it.toList()) }
val configuredCursor: FieldOrMetaField? =
val configuredCursor: DataOrMetaField? =
configuredStream.cursorField?.asSequence()?.let { cursorOrNull(it.toList()) }
val configuredSyncMode: ConfiguredSyncMode =
when (configuredStream.syncMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ResourceDrivenMetadataQuerierFactory(
level2.columns.map { (id: String, fullyQualifiedClassName: String) ->
val fieldType: FieldType =
Class.forName(fullyQualifiedClassName).kotlin.objectInstance as FieldType
Field(id, fieldType)
EmittedField(id, fieldType)
}
map[streamID] = TestStreamMetadata(columns, level2.primaryKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.TextCodec
import io.airbyte.cdk.discover.EmittedField
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.IntFieldType
import io.airbyte.cdk.discover.TestMetaFieldDecorator
Expand Down Expand Up @@ -73,7 +74,7 @@ abstract class AbstractCdcPartitionReaderTest<T : Comparable<T>, C : AutoCloseab
val stream =
Stream(
id = StreamIdentifier.from(StreamDescriptor().withName("tbl").withNamespace(namespace)),
schema = setOf(Field("v", IntFieldType), TestMetaFieldDecorator.GlobalCursor),
schema = setOf(EmittedField("v", IntFieldType), TestMetaFieldDecorator.GlobalCursor),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = null,
configuredCursor = TestMetaFieldDecorator.GlobalCursor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class JdbcMetadataQuerier(
val checkQueries: JdbcCheckQueries,
jdbcConnectionFactory: JdbcConnectionFactory,
) : MetadataQuerier {

val conn: Connection = jdbcConnectionFactory.get()

private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -109,12 +110,14 @@ class JdbcMetadataQuerier(
.map { it.catalog to it.schema }
.distinct()
.forEach { (catalog: String?, schema: String?) ->
dbmd.getPseudoColumns(catalog, schema, null, null).use { rs: ResultSet ->
while (rs.next()) {
val (tableName: TableName, metadata: ColumnMetadata) =
columnMetadataFromResultSet(rs, isPseudoColumn = true)
val joinedTableName: TableName = joinMap[tableName] ?: continue
results.add(joinedTableName to metadata)
if (constants.includePseudoColumns) {
dbmd.getPseudoColumns(catalog, schema, null, null).use { rs: ResultSet ->
while (rs.next()) {
val (tableName: TableName, metadata: ColumnMetadata) =
columnMetadataFromResultSet(rs, isPseudoColumn = true)
val joinedTableName: TableName = joinMap[tableName] ?: continue
results.add(joinedTableName to metadata)
}
}
}
dbmd.getColumns(catalog, schema, null, null).use { rs: ResultSet ->
Expand All @@ -126,7 +129,8 @@ class JdbcMetadataQuerier(
}
}
}
log.info { "Discovered ${results.size} column(s) and pseudo-column(s)." }
val clause = if (constants.includePseudoColumns) " and pseudo-column(s)" else ""
log.info { "Discovered ${results.size} column(s)${clause}."}
} catch (e: Exception) {
throw RuntimeException("Column name discovery query failed: ${e.message}", e)
}
Expand Down Expand Up @@ -204,7 +208,7 @@ class JdbcMetadataQuerier(
streamID: StreamIdentifier,
): List<Field> {
val table: TableName = findTableName(streamID) ?: return listOf()
return columnMetadata(table).map { Field(it.label, fieldTypeMapper.toFieldType(it)) }
return columnMetadata(table).map { EmittedField(it.label, fieldTypeMapper.toFieldType(it)) }
}

fun columnMetadata(table: TableName): List<ColumnMetadata> {
Expand Down Expand Up @@ -234,7 +238,7 @@ class JdbcMetadataQuerier(
): String {
val querySpec =
SelectQuerySpec(
SelectColumns(columnIDs.map { Field(it, NullFieldType) }),
SelectColumns(columnIDs.map { EmittedField(it, NullFieldType) }),
From(table.name, table.namespace()),
limit = Limit(0),
)
Expand Down Expand Up @@ -347,7 +351,7 @@ class JdbcMetadataQuerier(
selectQueryGenerator,
fieldTypeMapper,
checkQueries,
jdbcConnectionFactory,
jdbcConnectionFactory
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ data class DefaultJdbcConstants(
val maxMemoryBytesForTesting: Long? = null,
/** Whether the namespace field denotes a JDBC schema or a JDBC catalog. */
val namespaceKind: NamespaceKind = NamespaceKind.SCHEMA,
/** Whether to fetch pseudo-columns when querying column metadata. */
val includePseudoColumns: Boolean = true,
) {

enum class NamespaceKind {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.output.sockets.toJson
import io.airbyte.cdk.util.Jsons

/** Base class for default implementations of [JdbcPartition]. */
Expand Down Expand Up @@ -176,10 +177,10 @@ class DefaultJdbcSplittableSnapshotPartition(
)
}

override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
DefaultJdbcStreamStateValue.snapshotCheckpoint(
primaryKey = checkpointColumns,
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
primaryKeyCheckpoint = checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
)
}

Expand Down Expand Up @@ -243,10 +244,10 @@ class DefaultJdbcSplittableSnapshotWithCursorPartition(
)
}

override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
DefaultJdbcStreamStateValue.snapshotWithCursorCheckpoint(
primaryKey = checkpointColumns,
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
primaryKeyCheckpoint = checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
cursor,
cursorUpperBound,
)
Expand Down Expand Up @@ -283,9 +284,9 @@ class DefaultJdbcCursorIncrementalPartition(
cursorCheckpoint = cursorUpperBound,
)

override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
DefaultJdbcStreamStateValue.cursorIncrementalCheckpoint(
cursor,
cursorCheckpoint = lastRecord[cursor.id] ?: Jsons.nullNode(),
cursorCheckpoint = lastRecord.data.toJson()[cursor.id] ?: Jsons.nullNode(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.discover.DataOrMetaField
import io.airbyte.cdk.output.CatalogValidationFailureHandler
import io.airbyte.cdk.output.InvalidCursor
import io.airbyte.cdk.output.InvalidPrimaryKey
Expand Down Expand Up @@ -151,7 +151,7 @@ class DefaultJdbcPartitionFactory(
return null
}
val cursorLabel: String = cursors.keys.first()
val cursor: FieldOrMetaField? = stream.schema.find { it.id == cursorLabel }
val cursor: DataOrMetaField? = stream.schema.find { it.id == cursorLabel }
if (cursor !is Field) {
handler.accept(
InvalidCursor(stream.id, cursorLabel),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ interface JdbcSplittablePartition<S : JdbcStreamState<*>> : JdbcPartition<S> {
fun resumableQuery(limit: Long): SelectQuery

/** State value to emit when the partition is read up to (and including) [lastRecord]. */
fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue
fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue

}

/** A [JdbcPartition] which allows cursor-based incremental reads. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(

val incumbentLimit = AtomicLong()
val numRecords = AtomicLong()
val lastRecord = AtomicReference<ObjectNode?>(null)
val lastRecord = AtomicReference<SelectQuerier.ResultRow>(null)
val runComplete = AtomicBoolean(false)

override suspend fun run() {
Expand All @@ -200,7 +200,8 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
.use { result: SelectQuerier.Result ->
for (row in result) {
out(row)
lastRecord.set(row.data.toJson(Jsons.objectNode()))
// lastRecord.set(row.data.toJson(Jsons.objectNode()))
lastRecord.set(row)
// Check activity periodically to handle timeout.
if (numRecords.incrementAndGet() % fetchSize == 0L) {
coroutineContext.ensureActive()
Expand Down Expand Up @@ -237,7 +238,7 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
streamState.updateLimitState { it.down }
}
}
val checkpointState: OpaqueStateValue = partition.incompleteState(lastRecord.get()!!)
val checkpointState: OpaqueStateValue = partition.incompleteState(lastRecord.get())
return PartitionReadCheckpoint(
checkpointState,
numRecords.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract class JdbcPartitionsCreator<

/** Collects a sample of rows in the unsplit partition. */
fun <T> collectSample(
recordMapper: (ObjectNode) -> T,
recordMapper: (SelectQuerier.ResultRow) -> T,
): Sample<T> {
val values = mutableListOf<T>()
var previousWeight = 0L
Expand All @@ -105,7 +105,7 @@ abstract class JdbcPartitionsCreator<
val samplingQuery: SelectQuery = partition.samplingQuery(sampleRateInvPow2)
selectQuerier.executeQuery(samplingQuery).use {
for (row in it) {
values.add(recordMapper(row.data.toJson()))
values.add(recordMapper(row))
}
}
if (values.size < sharedState.maxSampleSize) {
Expand Down Expand Up @@ -152,7 +152,7 @@ class JdbcSequentialPartitionsCreator<
if (streamState.fetchSize == null) {
if (sharedState.withSampling) {
val rowByteSizeSample: Sample<Long> =
collectSample(sharedState.rowByteSizeEstimator()::apply)
collectSample { sharedState.rowByteSizeEstimator().apply(it.data.toJson()) }
val expectedTableByteSize: Long =
rowByteSizeSample.sampledValues.sum() * rowByteSizeSample.valueWeight
log.info { "Table memory size estimated at ${expectedTableByteSize shr 20} MiB." }
Expand Down Expand Up @@ -212,10 +212,10 @@ class JdbcConcurrentPartitionsCreator<
return listOf(JdbcNonResumablePartitionReader(partition))
}
// Sample the table for partition split boundaries and for record byte sizes.
val sample: Sample<Pair<OpaqueStateValue?, Long>> = collectSample { record: ObjectNode ->
val sample: Sample<Pair<OpaqueStateValue?, Long>> = collectSample { record: SelectQuerier.ResultRow ->
val boundary: OpaqueStateValue? =
(partition as? JdbcSplittablePartition<*>)?.incompleteState(record)
val rowByteSize: Long = sharedState.rowByteSizeEstimator().apply(record)
val rowByteSize: Long = sharedState.rowByteSizeEstimator().apply(record.data.toJson())
boundary to rowByteSize
}
if (sample.kind == Sample.Kind.EMPTY) {
Expand Down
Loading
Loading