Skip to content

Commit 7e39896

Browse files
committed
remove name checks and rely only on ids when available to check compatibility
1 parent 1addf60 commit 7e39896

File tree

2 files changed

+60
-142
lines changed

2 files changed

+60
-142
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,8 +2610,6 @@ def _check_pyarrow_schema_compatible(
26102610
Raises:
26112611
ValueError: If the schemas are not compatible.
26122612
"""
2613-
# Check if the PyArrow schema has explicit field IDs
2614-
has_field_ids = visit_pyarrow(provided_schema, _HasIds())
26152613
name_mapping = requested_schema.name_mapping
26162614

26172615
try:
@@ -2627,41 +2625,8 @@ def _check_pyarrow_schema_compatible(
26272625
)
26282626
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
26292627
raise ValueError(
2630-
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. "
2631-
"Update the schema first (hint, use union_by_name)."
2628+
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
26322629
) from e
2633-
2634-
# If the file has explicit field IDs, validate they match the table schema exactly
2635-
if has_field_ids:
2636-
# Build mappings for both schemas (including nested fields)
2637-
requested_id_to_name = requested_schema._lazy_id_to_name
2638-
provided_id_to_name = provided_schema._lazy_id_to_name
2639-
2640-
# Also build reverse mapping: path -> field_id for the table
2641-
requested_name_to_id = {path: field_id for field_id, path in requested_id_to_name.items()}
2642-
2643-
# Check that all field paths in the file have matching field IDs in the table
2644-
mismatched_fields = []
2645-
for field_id, provided_path in provided_id_to_name.items():
2646-
# Check if this path exists in the table schema
2647-
expected_field_id = requested_name_to_id.get(provided_path)
2648-
if expected_field_id is None:
2649-
# The file has a field path that doesn't exist in the table at all
2650-
# This will be caught by _check_schema_compatible later, so skip it here
2651-
continue
2652-
elif expected_field_id != field_id:
2653-
# Same path, different field ID - this is the critical error
2654-
mismatched_fields.append(
2655-
f"'{provided_path}': table expects field_id={expected_field_id}, file has field_id={field_id}"
2656-
)
2657-
2658-
if mismatched_fields:
2659-
raise ValueError(
2660-
"Field IDs in Parquet file do not match table schema. When field IDs are explicitly set in the "
2661-
"Parquet metadata, they must match the Iceberg table schema.\nMismatched fields:\n"
2662-
+ "\n".join(f" - {field}" for field in mismatched_fields)
2663-
)
2664-
26652630
_check_schema_compatible(requested_schema, provided_schema)
26662631

26672632

tests/integration/test_add_files.py

Lines changed: 59 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
LongType,
4848
NestedField,
4949
StringType,
50-
StructType,
5150
TimestampType,
5251
TimestamptzType,
5352
)
@@ -258,111 +257,6 @@ def test_add_files_to_unpartitioned_table_with_field_ids(
258257
assert all(df["qux"] == date(2024, 3, 7))
259258

260259

261-
@pytest.mark.integration
262-
def test_add_files_with_mismatched_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
263-
identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}"
264-
tbl = _create_table(session_catalog, identifier, format_version)
265-
266-
# Create schema with field IDs that don't match the table schema
267-
# Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog)
268-
# This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux)
269-
mismatched_schema = pa.schema(
270-
[
271-
pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}),
272-
pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
273-
pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}), # Wrong: should be 3
274-
pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}), # Wrong: should be 4
275-
]
276-
)
277-
278-
file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet"
279-
fo = tbl.io.new_output(file_path)
280-
with fo.create(overwrite=True) as fos:
281-
with pq.ParquetWriter(fos, schema=mismatched_schema) as writer:
282-
writer.write_table(ARROW_TABLE_WITH_IDS)
283-
284-
# Adding files with mismatched field IDs should fail
285-
with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"):
286-
tbl.add_files(file_paths=[file_path])
287-
288-
289-
@pytest.mark.integration
290-
def test_add_files_with_mismatched_nested_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
291-
"""Test that files with mismatched nested (struct) field IDs are rejected."""
292-
identifier = f"default.nested_mismatched_field_ids_v{format_version}"
293-
294-
# Create a table with a nested struct field
295-
try:
296-
session_catalog.drop_table(identifier=identifier)
297-
except NoSuchTableError:
298-
pass
299-
300-
nested_schema = Schema(
301-
NestedField(1, "id", IntegerType(), required=False),
302-
NestedField(
303-
2,
304-
"user",
305-
StructType(
306-
NestedField(3, "name", StringType(), required=False),
307-
NestedField(4, "age", IntegerType(), required=False),
308-
),
309-
required=False,
310-
),
311-
schema_id=0,
312-
)
313-
314-
tbl = session_catalog.create_table(
315-
identifier=identifier,
316-
schema=nested_schema,
317-
properties={"format-version": str(format_version)},
318-
)
319-
320-
# Create PyArrow schema with MISMATCHED nested field IDs
321-
# The table expects: id=1, user=2, user.name=3, user.age=4
322-
# This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs)
323-
pa_schema_mismatched = pa.schema(
324-
[
325-
pa.field("id", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"1"}),
326-
pa.field(
327-
"user",
328-
pa.struct(
329-
[
330-
pa.field("name", pa.string(), nullable=True, metadata={b"PARQUET:field_id": b"99"}), # Wrong!
331-
pa.field("age", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"100"}), # Wrong!
332-
]
333-
),
334-
nullable=True,
335-
metadata={b"PARQUET:field_id": b"2"},
336-
),
337-
]
338-
)
339-
340-
pa_table = pa.table(
341-
{
342-
"id": pa.array([1, 2, 3], type=pa.int32()),
343-
"user": pa.array(
344-
[
345-
{"name": "Alice", "age": 30},
346-
{"name": "Bob", "age": 25},
347-
{"name": "Charlie", "age": 35},
348-
],
349-
type=pa_schema_mismatched.field("user").type,
350-
),
351-
},
352-
schema=pa_schema_mismatched,
353-
)
354-
355-
file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet"
356-
fo = tbl.io.new_output(file_path)
357-
with fo.create(overwrite=True) as fos:
358-
with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer:
359-
writer.write_table(pa_table)
360-
361-
# Adding files with mismatched nested field IDs should fail
362-
with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"):
363-
tbl.add_files(file_paths=[file_path])
364-
365-
366260
@pytest.mark.integration
367261
def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
368262
from pyiceberg.io.pyarrow import parquet_file_to_data_file
@@ -707,6 +601,65 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog
707601
tbl.add_files(file_paths=[file_path])
708602

709603

604+
@pytest.mark.integration
605+
def test_add_files_with_field_ids_fails_on_schema_mismatch(
606+
spark: SparkSession, session_catalog: Catalog, format_version: int
607+
) -> None:
608+
"""Test that files with mismatched field types (when field IDs match) are rejected."""
609+
identifier = f"default.table_schema_mismatch_based_on_field_ids__fails_v{format_version}"
610+
611+
tbl = _create_table(session_catalog, identifier, format_version)
612+
613+
# All fields are renamed and reordered but have matching field IDs, so they should be compatible
614+
# except for 'baz' which has the wrong type
615+
WRONG_SCHEMA = pa.schema(
616+
[
617+
pa.field("qux_", pa.date32(), metadata={"PARQUET:field_id": "4"}),
618+
pa.field("baz_", pa.string(), metadata={"PARQUET:field_id": "3"}), # Wrong type: should be int32
619+
pa.field("bar_", pa.string(), metadata={"PARQUET:field_id": "2"}),
620+
pa.field("foo_", pa.bool_(), metadata={"PARQUET:field_id": "1"}),
621+
]
622+
)
623+
file_path = f"s3://warehouse/default/table_with_field_ids_schema_mismatch_fails/v{format_version}/test.parquet"
624+
# write parquet files
625+
fo = tbl.io.new_output(file_path)
626+
with fo.create(overwrite=True) as fos:
627+
with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer:
628+
writer.write_table(
629+
pa.Table.from_pylist(
630+
[
631+
{
632+
"qux_": date(2024, 3, 7),
633+
"baz_": "123",
634+
"bar_": "bar_string",
635+
"foo_": True,
636+
},
637+
{
638+
"qux_": date(2024, 3, 7),
639+
"baz_": "124",
640+
"bar_": "bar_string",
641+
"foo_": True,
642+
},
643+
],
644+
schema=WRONG_SCHEMA,
645+
)
646+
)
647+
648+
expected = """Mismatch in fields:
649+
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
650+
┃ ┃ Table field ┃ Dataframe field ┃
651+
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
652+
│ ✅ │ 1: foo: optional boolean │ 1: foo_: optional boolean │
653+
│ ✅ │ 2: bar: optional string │ 2: bar_: optional string │
654+
│ ❌ │ 3: baz: optional int │ 3: baz_: optional string │
655+
│ ✅ │ 4: qux: optional date │ 4: qux_: optional date │
656+
└────┴──────────────────────────┴───────────────────────────┘
657+
"""
658+
659+
with pytest.raises(ValueError, match=expected):
660+
tbl.add_files(file_paths=[file_path])
661+
662+
710663
@pytest.mark.integration
711664
def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
712665
identifier = f"default.unpartitioned_with_large_types{format_version}"

0 commit comments

Comments
 (0)