Skip to content

Commit d2ad80e

Browse files
committed
Better docstrings, var names
1 parent 327c25e commit d2ad80e

File tree

6 files changed

+248
-86
lines changed

6 files changed

+248
-86
lines changed

dlt/common/libs/deltalake.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
from dlt import version, Pipeline
77
from dlt.common import logger
88
from dlt.common.libs.pyarrow import pyarrow as pa
9-
from dlt.common.libs.pyarrow import cast_arrow_schema_types
9+
from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns
1010
from dlt.common.libs.utils import load_open_tables
11-
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
11+
from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns
1212
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
1313
from dlt.common.exceptions import MissingDependencyException, ValueErrorWithKnownValues
1414
from dlt.common.storages import FilesystemConfiguration
@@ -219,7 +219,6 @@ def evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema)
219219
return delta_table
220220

221221

222-
def get_table_columns(table: DeltaTable) -> List[str]:
223-
fields = table.schema().fields
224-
column_names = [field.name for field in fields]
225-
return column_names
222+
def get_table_columns(table: DeltaTable) -> TTableSchemaColumns:
223+
arrow_schema = table.schema().to_pyarrow()
224+
return py_arrow_to_table_schema_columns(arrow_schema)

dlt/common/libs/pyiceberg.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from dlt.common import logger
88
from dlt.common.destination.exceptions import DestinationUndefinedEntity
99
from dlt.common.time import precise_time
10-
from dlt.common.libs.pyarrow import cast_arrow_schema_types
10+
from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns
1111
from dlt.common.libs.utils import load_open_tables
1212
from dlt.common.pipeline import SupportsPipeline
13-
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
13+
from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns
1414
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
1515
from dlt.common.utils import assert_min_pkg_version
1616
from dlt.common.exceptions import MissingDependencyException
@@ -250,6 +250,6 @@ def make_location(path: str, config: FilesystemConfiguration) -> str:
250250
return location
251251

252252

253-
def get_table_columns(table: IcebergTable) -> List[str]:
254-
column_names = table.schema().column_names
255-
return column_names
253+
def get_table_columns(table: IcebergTable) -> TTableSchemaColumns:
254+
arrow_schema = table.schema().as_arrow()
255+
return py_arrow_to_table_schema_columns(arrow_schema)

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
from dlt.destinations.utils import (
8383
verify_schema_merge_disposition,
8484
verify_schema_replace_disposition,
85-
update_dlt_schema,
8685
)
8786

8887
CURRENT_VERSION: int = 2
@@ -474,8 +473,12 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
474473
def get_storage_tables(
475474
self, table_names: Iterable[str]
476475
) -> Iterable[Tuple[str, TTableSchemaColumns]]:
477-
"""Yields tables that have files in storage, returns columns from files in storage for regular delta/iceberg tables,
478-
or from schema for regular tables without table format"""
476+
"""Yield (table_name, column_schemas) pairs for tables that have files in storage.
477+
478+
For Delta and Iceberg tables, the columns present in the actual table metadata
479+
are returned. For tables using regular file formats, the column schemas come from the
480+
dlt schema instead, since their real schema cannot be reflected directly.
481+
"""
479482
for table_name in table_names:
480483
table_dir = self.get_table_dir(table_name)
481484
if (
@@ -492,13 +495,7 @@ def get_storage_tables(
492495
)
493496

494497
iceberg_table = self.load_open_table("iceberg", table_name)
495-
actual_column_names = get_iceberg_table_columns(iceberg_table)
496-
497-
col_schemas = {
498-
col: schema
499-
for col, schema in self.schema.get_table_columns(table_name).items()
500-
if col in actual_column_names
501-
}
498+
col_schemas = get_iceberg_table_columns(iceberg_table)
502499
yield (table_name, col_schemas)
503500

504501
elif self.is_open_table("delta", table_name):
@@ -507,16 +504,16 @@ def get_storage_tables(
507504
)
508505

509506
delta_table = self.load_open_table("delta", table_name)
510-
actual_column_names = get_delta_table_columns(delta_table)
511-
512-
col_schemas = {
513-
col: schema
514-
for col, schema in self.schema.get_table_columns(table_name).items()
515-
if col in actual_column_names
516-
}
507+
col_schemas = get_delta_table_columns(delta_table)
517508
yield (table_name, col_schemas)
518509

519510
else:
511+
logger.warning(
512+
f"Table '{table_name}' does not use a table format and does not support"
513+
" true schema reflection. Returning column schemas from the dlt"
514+
" schema, which may be stale if the underlying files were manually"
515+
" modified. "
516+
)
520517
yield (table_name, self.schema.get_table_columns(table_name))
521518

522519
else:

dlt/destinations/utils.py

Lines changed: 95 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
)
2929

3030
from dlt.destinations.exceptions import DatabaseTransientException
31+
from dlt.destinations.sql_client import WithSqlClient
3132
from dlt.extract import DltResource, resource as make_resource, DltSource
3233

3334
RE_DATA_TYPE = re.compile(r"([A-Z]+)\((\d+)(?:,\s?(\d+))?\)")
@@ -299,76 +300,118 @@ def get_deterministic_temp_table_name(table_name: str, op: str) -> str:
299300
return f"{op_name}_{NamingConvention._compute_tag(op_name, 0.001)}"
300301

301302

303+
class WithTableReflectionAndSql(WithTableReflection, WithSqlClient):
304+
pass
305+
306+
307+
def _diff_between_actual_and_dlt_schema(
308+
client: WithTableReflectionAndSql,
309+
schema: Schema,
310+
table_name: str,
311+
actual_col_names: set[str],
312+
disregard_dlt_columns: bool = True,
313+
) -> TPartialTableSchema:
314+
"""Compares dlt schema with destination table schema and returns columns that appear to be missing.
315+
316+
This function identifies columns that exist in the dlt schema but are missing from the actual
317+
destination table. It's used during schema synchronization to detect when columns may have
318+
been dropped from the destination and need to be removed from the dlt schema as well.
319+
320+
However, dlt internal columns (_dlt_id, _dlt_load_id) are treated specially because:
321+
322+
1. Users rarely drop dlt internal columns manually, and if they did,
323+
dlt cannot recover from this situation anyway.
324+
325+
2. Athena has a constraint where dlt columns exist in the data but not in the table metadata:
326+
327+
- Athena external tables have fixed schemas defined at CREATE TABLE time
328+
- These columns exist in the actual data files but don't appear in INFORMATION_SCHEMA
329+
- This causes false positives where dlt columns appear "missing" when they're not
330+
331+
Args:
332+
client (WithTableReflectionAndSql): The destination client with table reflection capabilities.
333+
schema (Schema): The dlt schema to compare against the destination.
334+
table_name (str): Name of the table to analyze.
335+
actual_col_names (set[str]): Column names that actually exist in the destination table,
336+
typically obtained from INFORMATION_SCHEMA queries. For Athena,
337+
this may not include dlt columns present in the underlying data files.
338+
disregard_dlt_columns: Whether to ignore apparent mismatches for dlt internal
339+
columns (_dlt_id, _dlt_load_id). Defaults to True to prevent incorrect
340+
removal of essential dlt columns from the schema.
341+
342+
Returns:
343+
TPartialTableSchema: Returns a partial table schema containing columns that exist in the dlt schema
344+
but are missing from the actual table.
345+
346+
Example:
347+
If dlt schema has [user_id, name, _dlt_id, _dlt_load_id] but destination
348+
INFORMATION_SCHEMA only shows [user_id, name], this function would return
349+
an empty dict (assuming disregard_dlt_columns=True) rather than suggesting
350+
the dlt columns should be dropped.
351+
"""
352+
col_schemas = schema.get_table_columns(table_name)
353+
354+
# Map escaped (like actual_col_names) -> original names (what appears in the dlt schema)
355+
escaped_to_dlt = {
356+
client.sql_client.escape_column_name(col, quote=False): col for col in col_schemas.keys()
357+
}
358+
359+
possibly_dropped_col_names = set(escaped_to_dlt.keys()) - actual_col_names
360+
361+
if not possibly_dropped_col_names:
362+
return {}
363+
364+
partial_table: TPartialTableSchema = {"name": table_name, "columns": {}}
365+
366+
for esc_name in possibly_dropped_col_names:
367+
name_in_dlt = escaped_to_dlt[esc_name]
368+
369+
if disregard_dlt_columns and name_in_dlt in [C_DLT_ID, C_DLT_LOAD_ID]:
370+
continue
371+
372+
col_schema = col_schemas[name_in_dlt]
373+
if col_schema.get("increment"):
374+
# We can warn within the for loop,
375+
# since there's only one incremental field per table
376+
logger.warning(
377+
f"An incremental field {name_in_dlt} is being removed from schema."
378+
"You should unset the"
379+
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
380+
)
381+
partial_table["columns"][name_in_dlt] = col_schema
382+
383+
return partial_table if partial_table["columns"] else {}
384+
385+
302386
def update_dlt_schema(
303-
client: WithTableReflection,
387+
client: WithTableReflectionAndSql,
304388
schema: Schema,
305389
table_names: Iterable[str] = None,
306390
dry_run: bool = False,
307391
) -> Optional[TSchemaDrop]:
308-
"""Updates schema to the storage.
392+
"""Updates the dlt schema from destination.
309393
310394
Compare the schema we think we should have with what actually exists in the destination,
311395
and drop any tables and/or columns that disappeared.
312396
313397
Args:
398+
client (WithTableReflectionAndSql): The destination client with table reflection capabilities.
399+
schema (Schema): The dlt schema to compare against the destination.
314400
table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables.
401+
dry_run (bool, optional): Whether to actually update the dlt schema. Defaults to False.
315402
316403
Returns:
317-
Optional[TSchemaTables]: Returns an update that was applied to the schema.
404+
Optional[TSchemaDrop]: Returns the update that was applied to the schema.
318405
"""
319-
from dlt.destinations.sql_client import WithSqlClient
320-
321-
if not isinstance(client, WithSqlClient):
322-
raise NotImplementedError
323-
324-
def _diff_between_actual_and_dlt_schema(
325-
table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True
326-
) -> TPartialTableSchema:
327-
"""Returns a partial table schema containing columns that exist in the dlt schema
328-
but are missing from the actual table. Skips dlt internal columns by default.
329-
"""
330-
col_schemas = schema.get_table_columns(table_name)
331-
332-
# Map escaped -> original names (actual_col_names are escaped)
333-
escaped_to_original = {
334-
client.sql_client.escape_column_name(col, quote=False): col
335-
for col in col_schemas.keys()
336-
}
337-
dropped_col_names = set(escaped_to_original.keys()) - actual_col_names
338-
339-
if not dropped_col_names:
340-
return {}
341-
342-
partial_table: TPartialTableSchema = {"name": table_name, "columns": {}}
343-
344-
for esc_name in dropped_col_names:
345-
orig_name = escaped_to_original[esc_name]
346-
347-
# Athena doesn't have dlt columns in actual columns. Don't drop them anyway.
348-
if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]:
349-
continue
350-
351-
col_schema = col_schemas[orig_name]
352-
if col_schema.get("increment"):
353-
# We can warn within the for loop,
354-
# since there's only one incremental field per table
355-
logger.warning(
356-
f"An incremental field {orig_name} is being removed from schema."
357-
"You should unset the"
358-
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
359-
)
360-
partial_table["columns"][orig_name] = col_schema
361-
362-
return partial_table if partial_table["columns"] else {}
363-
364406
tables = table_names if table_names else schema.data_table_names()
365407

366408
table_drops: TSchemaDrop = {} # includes entire tables to drop
367409
column_drops: TSchemaDrop = {} # includes parts of tables to drop as partial tables
368410

369411
# 1. Detect what needs to be dropped
412+
actual_table_col_schemas = dict(client.get_storage_tables(tables))
370413
for table_name in tables:
371-
_, actual_col_schemas = list(client.get_storage_tables([table_name]))[0]
414+
actual_col_schemas = actual_table_col_schemas[table_name]
372415

373416
# no actual column schemas ->
374417
# table doesn't exist ->
@@ -379,10 +422,12 @@ def _diff_between_actual_and_dlt_schema(
379422
continue
380423

381424
# actual column schemas present ->
382-
# we compare actual schemas with dlt ones ->
425+
# we compare actual column schemas with dlt ones ->
383426
# we take the difference as a partial table
384427
else:
385428
partial_table = _diff_between_actual_and_dlt_schema(
429+
client,
430+
schema,
386431
table_name,
387432
set(actual_col_schemas.keys()),
388433
)

dlt/pipeline/pipeline.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
113113
from dlt.destinations.fs_client import FSClientBase
114114
from dlt.destinations.dataset import get_destination_clients
115-
from dlt.destinations.utils import update_dlt_schema
115+
from dlt.destinations.utils import update_dlt_schema, WithTableReflectionAndSql
116116

117117
from dlt.load.configuration import LoaderConfiguration
118118
from dlt.load import Load
@@ -1066,9 +1066,12 @@ def sync_schema_from_destination(
10661066
with self._get_destination_clients(schema)[0] as client:
10671067
if not client.is_storage_initialized():
10681068
raise DestinationUndefinedEntity()
1069-
if isinstance(client, WithTableReflection):
1069+
if isinstance(client, WithTableReflection) and isinstance(client, WithSqlClient):
10701070
return update_dlt_schema(
1071-
client=client, schema=schema, table_names=table_names, dry_run=dry_run
1071+
client=cast(WithTableReflectionAndSql, client),
1072+
schema=schema,
1073+
table_names=table_names,
1074+
dry_run=dry_run,
10721075
)
10731076
else:
10741077
raise DestinationTableReflectionNotSupported(self._destination.destination_name)

0 commit comments

Comments
 (0)