Skip to content

Commit 3392c78

Browse files
committed
WIP
1 parent 3e28b50 commit 3392c78

File tree

9 files changed

+578
-55
lines changed

9 files changed

+578
-55
lines changed

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/data/JsonCodec.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,18 +336,22 @@ data class ArrayEncoder<T>(
336336
override fun encode(decoded: List<T>): JsonNode =
337337
Jsons.arrayNode().apply {
338338
for (e in decoded) {
339-
add(elementEncoder.encode(e))
339+
if (e == null) add(NullCodec.encode(e))
340+
else add(elementEncoder.encode(e))
340341
}
341342
}
342343
}
343344

344345
data class ArrayDecoder<T>(
345346
val elementDecoder: JsonDecoder<T>,
346-
) : JsonDecoder<List<T>> {
347-
override fun decode(encoded: JsonNode): List<T> {
347+
) : JsonDecoder<List<T?>> {
348+
override fun decode(encoded: JsonNode): List<T?> {
348349
if (!encoded.isArray) {
349350
throw IllegalArgumentException("invalid array value $encoded")
350351
}
351-
return encoded.elements().asSequence().map { elementDecoder.decode(it) }.toList()
352+
return encoded.elements().asSequence().map {
353+
if (it == null) null
354+
else elementDecoder.decode(it)
355+
}.toList()
352356
}
353357
}

airbyte-cdk/bulk/toolkits/source-tests/src/testFixtures/kotlin/io/airbyte/cdk/test/fixtures/connector/IntegrationTestOperations.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,18 @@ class IntegrationTestOperations(
2626
return streams
2727
}
2828

29+
@Deprecated("Use the correctly named 'read' function")
2930
fun sync(
3031
catalog: ConfiguredAirbyteCatalog,
3132
state: List<AirbyteStateMessage> = listOf(),
3233
vararg featureFlags: FeatureFlag
34+
): BufferingOutputConsumer =
35+
read(catalog, state, *featureFlags)
36+
37+
fun read(
38+
catalog: ConfiguredAirbyteCatalog,
39+
state: List<AirbyteStateMessage> = listOf(),
40+
vararg featureFlags: FeatureFlag
3341
): BufferingOutputConsumer {
3442
return CliRunner.source("read", configSpec, catalog, state, *featureFlags).run()
3543
}

airbyte-cdk/bulk/toolkits/source-tests/src/testFixtures/kotlin/io/airbyte/cdk/test/fixtures/tests/FieldTypeMapperTest.kt

Lines changed: 210 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.airbyte.cdk.data.AirbyteSchemaType
99
import io.airbyte.cdk.data.LeafAirbyteSchemaType
1010
import io.airbyte.cdk.discover.MetaField
1111
import io.airbyte.cdk.output.BufferingOutputConsumer
12+
import io.airbyte.cdk.test.fixtures.cleanup.TestAssetResourceNamer
1213
import io.airbyte.cdk.test.fixtures.connector.IntegrationTestOperations
1314
import io.airbyte.cdk.test.fixtures.connector.TestDbExecutor
1415
import io.airbyte.cdk.util.Jsons
@@ -47,10 +48,6 @@ abstract class FieldTypeMapperTest {
4748
return testCases.find { streamName.uppercase() in it.streamNamesToRecordData.keys }
4849
}
4950

50-
companion object {
51-
lateinit var ops: IntegrationTestOperations
52-
}
53-
5451
@TestFactory
5552
@Timeout(300)
5653
fun tests(): Iterable<DynamicNode> {
@@ -66,9 +63,15 @@ abstract class FieldTypeMapperTest {
6663
DiscoverAndReadAll(IntegrationTestOperations(configSpec), allStreamNamesAndRecordData)
6764
val discoverAndReadAllTest: DynamicNode =
6865
DynamicTest.dynamicTest("discover-and-read-all", actual)
69-
val testCases: List<DynamicNode> =
66+
/*val testCases: List<DynamicNode> =
7067
allStreamNamesAndRecordData.keys.map { streamName: String ->
68+
// TODO: fix test names to use type rather than stream name
7169
DynamicContainer.dynamicContainer(streamName, dynamicTests(actual, streamName))
70+
}*/
71+
val testCases: List<DynamicNode> =
72+
testCases.map {
73+
// TODO: fix test names to use type rather than stream name
74+
DynamicContainer.dynamicContainer(it.id, dynamicTests(actual, it.tableName))
7275
}
7376
return listOf(discoverAndReadAllTest) + testCases
7477
}
@@ -95,7 +98,9 @@ abstract class FieldTypeMapperTest {
9598
}
9699
val testCase: TestCase = findTestCase(streamName)!!
97100
val jsonSchema: JsonNode = actualStream!!.jsonSchema?.get("properties")!!
98-
val actualSchema: JsonNode? = jsonSchema[testCase.columnName.uppercase()]
101+
// TODO: don't break existing tests
102+
//val actualSchema: JsonNode? = jsonSchema[testCase.columnName.uppercase()]
103+
val actualSchema: JsonNode? = jsonSchema[testCase.columnName]
99104
Assertions.assertNotNull(actualSchema)
100105
val expectedSchema: JsonNode = testCase.airbyteSchemaType.asJsonSchema()
101106
Assertions.assertEquals(expectedSchema, actualSchema)
@@ -114,7 +119,7 @@ abstract class FieldTypeMapperTest {
114119
for (fieldName in data.fieldNames()) {
115120
if (
116121
fieldName.uppercase() == "ID" ||
117-
fieldName.startsWith(MetaField.META_PREFIX)
122+
fieldName.startsWith(MetaField.META_PREFIX)
118123
) {
119124
remove(fieldName)
120125
}
@@ -168,7 +173,7 @@ abstract class FieldTypeMapperTest {
168173
for (configuredStream in configuredStreams) {
169174
if (
170175
configuredStream.stream.supportedSyncModes.contains(SyncMode.INCREMENTAL) &&
171-
configuredStream.stream.sourceDefinedCursor == true
176+
configuredStream.stream.sourceDefinedCursor == true
172177
) {
173178
configuredStream.syncMode = SyncMode.INCREMENTAL
174179
// TODO: add support for sourceDefinedCursor
@@ -201,9 +206,11 @@ abstract class FieldTypeMapperTest {
201206
AirbyteTraceMessage.Type.ESTIMATE -> msg.trace?.estimate?.name
202207
AirbyteTraceMessage.Type.STREAM_STATUS ->
203208
msg.trace?.streamStatus?.streamDescriptor?.name
209+
204210
AirbyteTraceMessage.Type.ANALYTICS -> null
205211
null -> null
206212
}
213+
207214
else -> null
208215
}
209216
}
@@ -214,24 +221,28 @@ abstract class FieldTypeMapperTest {
214221
val values: Map<String, String>,
215222
val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.STRING,
216223
) {
224+
companion object {
225+
val testAssetResourceNamer = TestAssetResourceNamer()
226+
}
227+
217228
val id: String
218229
get() =
219230
sqlType
231+
.replace("\\[]".toRegex(), "_array")
220232
.replace("[^a-zA-Z0-9]".toRegex(), " ")
221233
.trim()
222234
.replace(" +".toRegex(), "_")
223235
.lowercase()
224236

225-
val tableName: String
226-
get() = "tbl_$id"
237+
val tableName = testAssetResourceNamer.getName()
227238

228239
val columnName: String
229240
get() = "col_$id"
230241

231242
val dml: List<String>
232243
get() {
233244
return values.keys.map {
234-
"INSERT INTO $namespace.$tableName ($columnName) VALUES ($it)"
245+
"INSERT INTO \"$namespace\".\"$tableName\" (\"$columnName\") VALUES ($it)"
235246
}
236247
}
237248

@@ -240,9 +251,196 @@ abstract class FieldTypeMapperTest {
240251
return mapOf(
241252
tableName.uppercase() to
242253
values.values.map {
243-
Jsons.readTree("""{"${columnName.uppercase()}":$it}""")
254+
// TODO: don't break existing tests
255+
//Jsons.readTree("""{"${columnName.uppercase()}":$it}""")
256+
Jsons.readTree("""{"$columnName":$it}""")
244257
}
245258
)
246259
}
247260
}
261+
262+
// pads the json string map values to a fixed length
263+
protected fun Map<String, String>.withLength(length: Int): Map<String, String> {
264+
return this.mapValues {
265+
val currentLength = it.value.length - 2 // exclude the quotes
266+
if (currentLength > length) {
267+
throw IllegalArgumentException("$length is out of bounds")
268+
} else {
269+
// make it longer
270+
it.value.replace("\"$".toRegex(), "\"".padStart(length - currentLength + 1))
271+
}
272+
}
273+
}
274+
}
275+
276+
object AnsiSql {
277+
278+
val floatValues =
279+
mapOf(
280+
"null" to "null",
281+
"45.67" to "45.67",
282+
"98.76" to "98.76",
283+
"0.12" to "0.12",
284+
)
285+
286+
val intValues =
287+
mapOf(
288+
"null" to "null",
289+
"1" to "1",
290+
"0" to "0",
291+
"-1" to "-1",
292+
"2147483647" to "2147483647",
293+
"-2147483648" to "-2147483648",
294+
)
295+
296+
val smallIntValues =
297+
mapOf(
298+
"null" to "null",
299+
"1" to "1",
300+
"0" to "0",
301+
"-1" to "-1",
302+
"32767" to "32767",
303+
"-32768" to "-32768",
304+
)
305+
306+
val bigIntValues =
307+
mapOf(
308+
"null" to "null",
309+
"1" to "1",
310+
"0" to "0",
311+
"-1" to "-1",
312+
"9223372036854775807" to "9223372036854775807",
313+
"-9223372036854775808" to "-9223372036854775808",
314+
)
315+
316+
val decimalValues =
317+
mapOf(
318+
"null" to "null",
319+
"123456789.123456789" to "123456789.123456789",
320+
"-123456789.123456789" to "-123456789.123456789",
321+
"0.000000001" to "0.000000001",
322+
"9999999999.999999999" to "9999999999.999999999",
323+
"-9999999999.999999999" to "-9999999999.999999999",
324+
)
325+
326+
val realValues =
327+
mapOf(
328+
"null" to "null",
329+
"3.402E+38" to "3.402E+38",
330+
"-3.402E+38" to "-3.402E+38",
331+
"1.175E-37" to "1.175E-37",
332+
"0.0" to "0.0",
333+
)
334+
335+
val doubleValues =
336+
mapOf(
337+
"null" to "null",
338+
"1.7976931348623157E+308" to "1.7976931348623157E+308",
339+
"-1.7976931348623157E+308" to "-1.7976931348623157E+308",
340+
"2.2250738585072014E-308" to "2.2250738585072014E-308",
341+
"0.0" to "0.0",
342+
)
343+
344+
val booleanValues =
345+
mapOf(
346+
"null" to "null",
347+
"true" to "true",
348+
"false" to "false",
349+
)
350+
351+
val charValues =
352+
mapOf(
353+
"null" to "null",
354+
"'a'" to "\"a\"",
355+
"'Z'" to "\"Z\"",
356+
"'1'" to "\"1\"",
357+
"' '" to "\" \"",
358+
)
359+
360+
val varcharValues =
361+
mapOf(
362+
"null" to "null",
363+
"'Hello'" to "\"Hello\"",
364+
"'12345'" to "\"12345\"",
365+
"' '" to "\" \"",
366+
"''" to "\"\"",
367+
)
368+
369+
val clobValues =
370+
mapOf("null" to "null", "'Large Text Content'" to "\"Large Text Content\"")
371+
372+
val blobValues =
373+
mapOf("null" to "null", "CAST(X'48656C6C6F' AS BLOB)" to "\"SGVsbG8=\"")
374+
375+
val binary20Values =
376+
mapOf(
377+
"null" to "null",
378+
"CAST(X'0123' AS VARBINARY)" to "\"ASMAAAAAAAAAAAAAAAAAAAAAAAA=\"",
379+
"CAST(X'0123456789ABCDEF' AS VARBINARY)" to "\"ASNFZ4mrze8AAAAAAAAAAAAAAAA=\"",
380+
)
381+
382+
val varbinaryValues =
383+
mapOf(
384+
"null" to "null",
385+
"CAST(X'0123' AS VARBINARY)" to "\"ASM=\"",
386+
"CAST(X'0123456789ABCDEF' AS VARBINARY)" to "\"ASNFZ4mrze8=\"",
387+
)
388+
389+
val dateValues =
390+
mapOf(
391+
"null" to "null",
392+
"'1000-01-01'" to "\"1000-01-01\"",
393+
"'9999-12-31'" to "\"9999-12-31\"",
394+
)
395+
396+
val timeValues =
397+
mapOf(
398+
"null" to "null",
399+
"'00:00:00'" to "\"00:00:00.000000\"",
400+
"'23:59:59'" to "\"23:59:59.000000\"",
401+
)
402+
403+
val timestampValues =
404+
mapOf(
405+
"null" to "null",
406+
"'1000-01-01 00:00:00'" to "\"1000-01-01T00:00:00.000000\"",
407+
"'9999-12-31 23:59:59'" to "\"9999-12-31T23:59:59.000000\"",
408+
)
409+
410+
val timestampWithTzValues =
411+
mapOf(
412+
"null" to "null",
413+
"'1000-01-01 00:00:00'" to "\"1000-01-01T00:00:00.000000Z\"",
414+
"'9999-12-31 23:59:59'" to "\"9999-12-31T23:59:59.000000Z\"",
415+
)
416+
417+
val xmlValues =
418+
mapOf(
419+
"null" to "null",
420+
"'<root><node>value</node></root>'" to "\"<root><node>value</node></root>\""
421+
)
422+
423+
val preservedInfiniteValues =
424+
mapOf(
425+
"'Infinity'" to "\"Infinity\"",
426+
"'-Infinity'" to "\"-Infinity\"",
427+
)
428+
429+
val nulledInfiniteValues =
430+
mapOf(
431+
"'Infinity'" to "null",
432+
"'-Infinity'" to "null",
433+
)
434+
435+
val preservedNanValues =
436+
preservedInfiniteValues
437+
.plus(
438+
"'NaN'" to "\"NaN\"",
439+
)
440+
441+
val nulledNanValues =
442+
nulledInfiniteValues
443+
.plus(
444+
"'NaN'" to "null",
445+
)
248446
}

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSource.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,20 @@ private val log = KotlinLogging.logger {}
1010
object PostgresSource {
1111
@JvmStatic
1212
fun main(args: Array<String>) {
13+
// TODO: Remove before merging
1314
args.forEachIndexed { index, arg ->
1415
log.info { "***$index $arg" }
1516
if (index in listOf(2, 4, 6)) {
1617
log.info { (File(arg).readText()) }
1718
}
1819
}
19-
AirbyteSourceRunner.run(*args)
20+
val rootPath = "/Users/matt.bayley/dev/airbyte/secrets/airbyte/postgres"
21+
val configPath = "$rootPath/config.json"
22+
val catalogPath = "$rootPath/catalog.json"
23+
val statePath = "$rootPath/state.json"
24+
val checkArgs = arrayOf("--check", "--config", configPath)
25+
val discoverArgs = arrayOf("--discover", "--config", configPath)
26+
val readArgs = arrayOf("--read", "--config", configPath, "--catalog", catalogPath)
27+
AirbyteSourceRunner.run(*readArgs)
2028
}
2129
}

0 commit comments

Comments
 (0)