diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java index 1a57a6444870..914325c3956e 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -183,7 +183,7 @@ private GenericRecord convertToStruct( if (type != null) { String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); - schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type); + schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type, true); } } } else { @@ -218,7 +218,11 @@ private GenericRecord convertToStruct( String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); Type type = SchemaUtils.toIcebergType(recordField.schema(), config); - schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); + schemaUpdateConsumer.addColumn( + parentFieldName, + recordField.name(), + type, + config.schemaForceOptional() || recordField.schema().isOptional()); } } else { boolean hasSchemaUpdates = false; diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java index 809bea84dcc2..0b9f9fd28ea5 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java @@ -47,8 +47,8 @@ boolean empty() { return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty(); } - void addColumn(String parentName, String name, Type type) { - AddColumn addCol = new AddColumn(parentName, name, type); + void addColumn(String parentName, String name, Type type, boolean isOptional) { + AddColumn addCol = new AddColumn(parentName, name, type, isOptional); addColumns.put(addCol.key(), addCol); } @@ -65,11 +65,13 @@ static class AddColumn extends SchemaUpdate { private final String parentName; private final String name; private final Type type; + private final boolean isOptional; - AddColumn(String parentName, String name, Type type) { + AddColumn(String parentName, String name, Type type, boolean isOptional) { this.parentName = parentName; this.name = name; this.type = type; + this.isOptional = isOptional; } String parentName() { @@ -87,6 +89,10 @@ String key() { Type type() { return type; } + + boolean isOptional() { + return isOptional; + } } static class UpdateType extends SchemaUpdate { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java index b0dd56b45d67..3694387c36ed 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java @@ -123,7 +123,13 @@ private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updat // apply the updates UpdateSchema updateSchema = table.updateSchema(); addColumns.forEach( - update -> updateSchema.addColumn(update.parentName(), update.name(), update.type())); + update -> { + if (update.isOptional()) { + updateSchema.addColumn(update.parentName(), update.name(), update.type()); + } else { + updateSchema.addRequiredColumn(update.parentName(), update.name(), update.type()); + } + }); updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type())); makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name())); updateSchema.commit(); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java index 45d07f69591b..4abe3d321f65 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java @@ -817,6 +817,128 @@ private void assertTypesAddedFromStruct(Function fn) { assertThat(fn.apply("ma")).isInstanceOf(MapType.class); } + @Test + public void testAddColumnOptionalityForMapInferredColumns() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = ImmutableMap.of("new_col", "test_value"); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("new_col"); + assertThat(addCol.isOptional()).isTrue(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAddColumnOptionalityForStructWithRequiredFields(boolean forceOptional) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + when(config.schemaForceOptional()).thenReturn(forceOptional); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = SchemaBuilder.struct().field("new_col", Schema.STRING_SCHEMA).build(); + Struct data = new Struct(valueSchema).put("new_col", "test_value"); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("new_col"); + assertThat(addCol.isOptional()).isEqualTo(forceOptional); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAddColumnOptionalityForStructWithOptionalFields(boolean forceOptional) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + when(config.schemaForceOptional()).thenReturn(forceOptional); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = + SchemaBuilder.struct().field("new_col", Schema.OPTIONAL_STRING_SCHEMA).build(); + Struct data = new Struct(valueSchema).put("new_col", "test_value"); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("new_col"); + assertThat(addCol.isOptional()).isTrue(); + } + + @Test + public void testAddColumnOptionalityForStructRequiredFieldWithNoForceOptional() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + when(config.schemaForceOptional()).thenReturn(false); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = + SchemaBuilder.struct() + .field("req_col", Schema.STRING_SCHEMA) + .field("opt_col", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Struct data = new Struct(valueSchema).put("req_col", "test1").put("opt_col", "test2"); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(2); + Map colMap = Maps.newHashMap(); + addCols.forEach(col -> colMap.put(col.name(), col)); + + assertThat(colMap.get("req_col").isOptional()).isFalse(); + assertThat(colMap.get("opt_col").isOptional()).isTrue(); + } + + @Test + public void testAddColumnOptionalityForNestedStructFields() { + org.apache.iceberg.Schema baseSchema = + new org.apache.iceberg.Schema(NestedField.required(1, "base", IntegerType.get())); + + Table table = mock(Table.class); + when(table.schema()).thenReturn(baseSchema); + when(config.schemaForceOptional()).thenReturn(false); + RecordConverter converter = new RecordConverter(table, config); + + Schema nestedSchema = + SchemaBuilder.struct() + .field("req_nested", Schema.STRING_SCHEMA) + .field("opt_nested", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Schema valueSchema = + SchemaBuilder.struct().field("base", Schema.INT32_SCHEMA).field("nested", nestedSchema); + + Struct nestedStruct = + new Struct(nestedSchema).put("req_nested", "test1").put("opt_nested", "test2"); + Struct data = new Struct(valueSchema).put("base", 1).put("nested", nestedStruct); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + AddColumn nestedCol = addCols.iterator().next(); + assertThat(nestedCol.name()).isEqualTo("nested"); + assertThat(nestedCol.type()).isInstanceOf(StructType.class); + + StructType nestedType = nestedCol.type().asStructType(); + assertThat(nestedType.field("req_nested").isRequired()).isTrue(); + assertThat(nestedType.field("opt_nested").isOptional()).isTrue(); + } + @Test public void testEvolveTypeDetectionStruct() { org.apache.iceberg.Schema tableSchema = diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java index 22b3c6d53537..e1342e467dd2 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java @@ -28,7 +28,7 @@ public class TestSchemaUpdate { @Test public void testAddColumn() { SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); - updateConsumer.addColumn("parent", "name", Types.StringType.get()); + updateConsumer.addColumn("parent", "name", Types.StringType.get(), true); assertThat(updateConsumer.addColumns()).hasSize(1); assertThat(updateConsumer.updateTypes()).isEmpty(); assertThat(updateConsumer.makeOptionals()).isEmpty(); @@ -37,6 +37,22 @@ public void testAddColumn() { assertThat(addColumn.parentName()).isEqualTo("parent"); assertThat(addColumn.name()).isEqualTo("name"); assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + assertThat(addColumn.isOptional()).isTrue(); + } + + @Test + public void testAddColumnRequired() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + updateConsumer.addColumn("parent", "name", Types.StringType.get(), false); + assertThat(updateConsumer.addColumns()).hasSize(1); + assertThat(updateConsumer.updateTypes()).isEmpty(); + assertThat(updateConsumer.makeOptionals()).isEmpty(); + + SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next(); + assertThat(addColumn.parentName()).isEqualTo("parent"); + assertThat(addColumn.name()).isEqualTo("name"); + assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + assertThat(addColumn.isOptional()).isFalse(); } @Test diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java index bde2452128b9..768c8133f27b 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java @@ -103,11 +103,11 @@ public void testApplySchemaUpdates() { // the updates to "i" should be ignored as it already exists and is the same type SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); - consumer.addColumn(null, "i", IntegerType.get()); + consumer.addColumn(null, "i", IntegerType.get(), true); consumer.updateType("i", IntegerType.get()); consumer.makeOptional("i"); consumer.updateType("f", DoubleType.get()); - consumer.addColumn(null, "s", StringType.get()); + consumer.addColumn(null, "s", StringType.get(), true); SchemaUtils.applySchemaUpdates(table, consumer); verify(table).refresh(); @@ -133,11 +133,11 @@ public void testApplyNestedSchemaUpdates() { // the updates to "st.i" should be ignored as it already exists and is the same type SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); - consumer.addColumn("st", "i", IntegerType.get()); + consumer.addColumn("st", "i", IntegerType.get(), true); consumer.updateType("st.i", IntegerType.get()); consumer.makeOptional("st.i"); consumer.updateType("st.f", DoubleType.get()); - consumer.addColumn("st", "s", StringType.get()); + consumer.addColumn("st", "s", StringType.get(), true); SchemaUtils.applySchemaUpdates(table, consumer); verify(table).refresh(); @@ -168,6 +168,46 @@ public void testApplySchemaUpdatesNoUpdates() { verify(table, times(0)).updateSchema(); } + @Test + public void testApplySchemaUpdatesWithRequiredColumns() { + UpdateSchema updateSchema = mock(UpdateSchema.class); + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + when(table.updateSchema()).thenReturn(updateSchema); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn(null, "s1", StringType.get(), true); + consumer.addColumn(null, "s2", StringType.get(), false); + + SchemaUtils.applySchemaUpdates(table, consumer); + verify(table).refresh(); + verify(table).updateSchema(); + + verify(updateSchema).addColumn(isNull(), eq("s1"), isA(StringType.class)); + verify(updateSchema).addRequiredColumn(isNull(), eq("s2"), isA(StringType.class)); + verify(updateSchema).commit(); + } + + @Test + public void testApplyNestedSchemaUpdatesWithRequiredColumns() { + UpdateSchema updateSchema = mock(UpdateSchema.class); + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + when(table.updateSchema()).thenReturn(updateSchema); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn("st", "s1", StringType.get(), true); + consumer.addColumn("st", "s2", StringType.get(), false); + + SchemaUtils.applySchemaUpdates(table, consumer); + verify(table).refresh(); + verify(table).updateSchema(); + + verify(updateSchema).addColumn(eq("st"), eq("s1"), isA(StringType.class)); + verify(updateSchema).addRequiredColumn(eq("st"), eq("s2"), isA(StringType.class)); + verify(updateSchema).commit(); + } + @Test public void testNeedsDataTypeUpdate() { // valid updates