Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private GenericRecord convertToStruct(
if (type != null) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type);
schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type, true);
}
}
} else {
Expand Down Expand Up @@ -218,7 +218,11 @@ private GenericRecord convertToStruct(
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type);
schemaUpdateConsumer.addColumn(
parentFieldName,
recordField.name(),
type,
config.schemaForceOptional() || recordField.schema().isOptional());
}
} else {
boolean hasSchemaUpdates = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ boolean empty() {
return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty();
}

void addColumn(String parentName, String name, Type type) {
AddColumn addCol = new AddColumn(parentName, name, type);
void addColumn(String parentName, String name, Type type, boolean isOptional) {
AddColumn addCol = new AddColumn(parentName, name, type, isOptional);
addColumns.put(addCol.key(), addCol);
}

Expand All @@ -65,11 +65,13 @@ static class AddColumn extends SchemaUpdate {
private final String parentName;
private final String name;
private final Type type;
private final boolean isOptional;

AddColumn(String parentName, String name, Type type) {
AddColumn(String parentName, String name, Type type, boolean isOptional) {
this.parentName = parentName;
this.name = name;
this.type = type;
this.isOptional = isOptional;
}

String parentName() {
Expand All @@ -87,6 +89,10 @@ String key() {
Type type() {
return type;
}

boolean isOptional() {
return isOptional;
}
}

static class UpdateType extends SchemaUpdate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updat
// apply the updates
UpdateSchema updateSchema = table.updateSchema();
addColumns.forEach(
update -> updateSchema.addColumn(update.parentName(), update.name(), update.type()));
update -> {
if (update.isOptional()) {
updateSchema.addColumn(update.parentName(), update.name(), update.type());
} else {
updateSchema.addRequiredColumn(update.parentName(), update.name(), update.type());
}
});
updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type()));
makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name()));
updateSchema.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,128 @@ private void assertTypesAddedFromStruct(Function<String, Type> fn) {
assertThat(fn.apply("ma")).isInstanceOf(MapType.class);
}

@Test
public void testAddColumnOptionalityForMapInferredColumns() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = ImmutableMap.of("new_col", "test_value");
SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();

assertThat(addCols).hasSize(1);
AddColumn addCol = addCols.iterator().next();
assertThat(addCol.name()).isEqualTo("new_col");
assertThat(addCol.isOptional()).isTrue();
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testAddColumnOptionalityForStructWithRequiredFields(boolean forceOptional) {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
when(config.schemaForceOptional()).thenReturn(forceOptional);
RecordConverter converter = new RecordConverter(table, config);

Schema valueSchema = SchemaBuilder.struct().field("new_col", Schema.STRING_SCHEMA).build();
Struct data = new Struct(valueSchema).put("new_col", "test_value");

SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();

assertThat(addCols).hasSize(1);
AddColumn addCol = addCols.iterator().next();
assertThat(addCol.name()).isEqualTo("new_col");
assertThat(addCol.isOptional()).isEqualTo(forceOptional);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testAddColumnOptionalityForStructWithOptionalFields(boolean forceOptional) {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
when(config.schemaForceOptional()).thenReturn(forceOptional);
RecordConverter converter = new RecordConverter(table, config);

Schema valueSchema =
SchemaBuilder.struct().field("new_col", Schema.OPTIONAL_STRING_SCHEMA).build();
Struct data = new Struct(valueSchema).put("new_col", "test_value");

SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();

assertThat(addCols).hasSize(1);
AddColumn addCol = addCols.iterator().next();
assertThat(addCol.name()).isEqualTo("new_col");
assertThat(addCol.isOptional()).isTrue();
}

@Test
public void testAddColumnOptionalityForStructRequiredFieldWithNoForceOptional() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
when(config.schemaForceOptional()).thenReturn(false);
RecordConverter converter = new RecordConverter(table, config);

Schema valueSchema =
SchemaBuilder.struct()
.field("req_col", Schema.STRING_SCHEMA)
.field("opt_col", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Struct data = new Struct(valueSchema).put("req_col", "test1").put("opt_col", "test2");

SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();

assertThat(addCols).hasSize(2);
Map<String, AddColumn> colMap = Maps.newHashMap();
addCols.forEach(col -> colMap.put(col.name(), col));

assertThat(colMap.get("req_col").isOptional()).isFalse();
assertThat(colMap.get("opt_col").isOptional()).isTrue();
}

@Test
public void testAddColumnOptionalityForNestedStructFields() {
org.apache.iceberg.Schema baseSchema =
new org.apache.iceberg.Schema(NestedField.required(1, "base", IntegerType.get()));

Table table = mock(Table.class);
when(table.schema()).thenReturn(baseSchema);
when(config.schemaForceOptional()).thenReturn(false);
RecordConverter converter = new RecordConverter(table, config);

Schema nestedSchema =
SchemaBuilder.struct()
.field("req_nested", Schema.STRING_SCHEMA)
.field("opt_nested", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Schema valueSchema =
SchemaBuilder.struct().field("base", Schema.INT32_SCHEMA).field("nested", nestedSchema);

Struct nestedStruct =
new Struct(nestedSchema).put("req_nested", "test1").put("opt_nested", "test2");
Struct data = new Struct(valueSchema).put("base", 1).put("nested", nestedStruct);

SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();

assertThat(addCols).hasSize(1);
AddColumn nestedCol = addCols.iterator().next();
assertThat(nestedCol.name()).isEqualTo("nested");
assertThat(nestedCol.type()).isInstanceOf(StructType.class);

StructType nestedType = nestedCol.type().asStructType();
assertThat(nestedType.field("req_nested").isRequired()).isTrue();
assertThat(nestedType.field("opt_nested").isOptional()).isTrue();
}

@Test
public void testEvolveTypeDetectionStruct() {
org.apache.iceberg.Schema tableSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TestSchemaUpdate {
@Test
public void testAddColumn() {
SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
updateConsumer.addColumn("parent", "name", Types.StringType.get());
updateConsumer.addColumn("parent", "name", Types.StringType.get(), true);
assertThat(updateConsumer.addColumns()).hasSize(1);
assertThat(updateConsumer.updateTypes()).isEmpty();
assertThat(updateConsumer.makeOptionals()).isEmpty();
Expand All @@ -37,6 +37,22 @@ public void testAddColumn() {
assertThat(addColumn.parentName()).isEqualTo("parent");
assertThat(addColumn.name()).isEqualTo("name");
assertThat(addColumn.type()).isEqualTo(Types.StringType.get());
assertThat(addColumn.isOptional()).isTrue();
}

@Test
public void testAddColumnRequired() {
SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer();
updateConsumer.addColumn("parent", "name", Types.StringType.get(), false);
assertThat(updateConsumer.addColumns()).hasSize(1);
assertThat(updateConsumer.updateTypes()).isEmpty();
assertThat(updateConsumer.makeOptionals()).isEmpty();

SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next();
assertThat(addColumn.parentName()).isEqualTo("parent");
assertThat(addColumn.name()).isEqualTo("name");
assertThat(addColumn.type()).isEqualTo(Types.StringType.get());
assertThat(addColumn.isOptional()).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public void testApplySchemaUpdates() {

// the updates to "i" should be ignored as it already exists and is the same type
SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
consumer.addColumn(null, "i", IntegerType.get());
consumer.addColumn(null, "i", IntegerType.get(), true);
consumer.updateType("i", IntegerType.get());
consumer.makeOptional("i");
consumer.updateType("f", DoubleType.get());
consumer.addColumn(null, "s", StringType.get());
consumer.addColumn(null, "s", StringType.get(), true);

SchemaUtils.applySchemaUpdates(table, consumer);
verify(table).refresh();
Expand All @@ -133,11 +133,11 @@ public void testApplyNestedSchemaUpdates() {

// the updates to "st.i" should be ignored as it already exists and is the same type
SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
consumer.addColumn("st", "i", IntegerType.get());
consumer.addColumn("st", "i", IntegerType.get(), true);
consumer.updateType("st.i", IntegerType.get());
consumer.makeOptional("st.i");
consumer.updateType("st.f", DoubleType.get());
consumer.addColumn("st", "s", StringType.get());
consumer.addColumn("st", "s", StringType.get(), true);

SchemaUtils.applySchemaUpdates(table, consumer);
verify(table).refresh();
Expand Down Expand Up @@ -168,6 +168,46 @@ public void testApplySchemaUpdatesNoUpdates() {
verify(table, times(0)).updateSchema();
}

@Test
public void testApplySchemaUpdatesWithRequiredColumns() {
UpdateSchema updateSchema = mock(UpdateSchema.class);
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
when(table.updateSchema()).thenReturn(updateSchema);

SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
consumer.addColumn(null, "s1", StringType.get(), true);
consumer.addColumn(null, "s2", StringType.get(), false);

SchemaUtils.applySchemaUpdates(table, consumer);
verify(table).refresh();
verify(table).updateSchema();

verify(updateSchema).addColumn(isNull(), eq("s1"), isA(StringType.class));
verify(updateSchema).addRequiredColumn(isNull(), eq("s2"), isA(StringType.class));
verify(updateSchema).commit();
}

@Test
public void testApplyNestedSchemaUpdatesWithRequiredColumns() {
UpdateSchema updateSchema = mock(UpdateSchema.class);
Table table = mock(Table.class);
when(table.schema()).thenReturn(NESTED_SCHEMA);
when(table.updateSchema()).thenReturn(updateSchema);

SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
consumer.addColumn("st", "s1", StringType.get(), true);
consumer.addColumn("st", "s2", StringType.get(), false);

SchemaUtils.applySchemaUpdates(table, consumer);
verify(table).refresh();
verify(table).updateSchema();

verify(updateSchema).addColumn(eq("st"), eq("s1"), isA(StringType.class));
verify(updateSchema).addRequiredColumn(eq("st"), eq("s2"), isA(StringType.class));
verify(updateSchema).commit();
}

@Test
public void testNeedsDataTypeUpdate() {
// valid updates
Expand Down
Loading