Skip to content

Commit b1c3f3f

Browse files
committed
Better docstrings, var names
1 parent 327c25e commit b1c3f3f

File tree

4 files changed

+156
-55
lines changed

4 files changed

+156
-55
lines changed

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
from dlt.destinations.utils import (
8383
verify_schema_merge_disposition,
8484
verify_schema_replace_disposition,
85-
update_dlt_schema,
8685
)
8786

8887
CURRENT_VERSION: int = 2
@@ -474,8 +473,12 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
474473
def get_storage_tables(
475474
self, table_names: Iterable[str]
476475
) -> Iterable[Tuple[str, TTableSchemaColumns]]:
477-
"""Yields tables that have files in storage, returns columns from files in storage for regular delta/iceberg tables,
478-
or from schema for regular tables without table format"""
476+
"""Yield (table_name, column_schemas) pairs for tables that have files in storage.
477+
478+
For Delta and Iceberg tables, the columns present in the actual table metadata
479+
are returned. For tables using regular file formats, the column schemas come from the
480+
dlt schema instead, since their real schema cannot be reflected directly.
481+
"""
479482
for table_name in table_names:
480483
table_dir = self.get_table_dir(table_name)
481484
if (
@@ -517,6 +520,11 @@ def get_storage_tables(
517520
yield (table_name, col_schemas)
518521

519522
else:
523+
logger.warning(
524+
f"Table '{table_name}' uses regular file format and does not support"
525+
" true schema reflection. Returning column schemas from the dlt"
526+
" schema. "
527+
)
520528
yield (table_name, self.schema.get_table_columns(table_name))
521529

522530
else:

dlt/destinations/utils.py

Lines changed: 93 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
)
2929

3030
from dlt.destinations.exceptions import DatabaseTransientException
31+
from dlt.destinations.sql_client import WithSqlClient
3132
from dlt.extract import DltResource, resource as make_resource, DltSource
3233

3334
RE_DATA_TYPE = re.compile(r"([A-Z]+)\((\d+)(?:,\s?(\d+))?\)")
@@ -299,68 +300,109 @@ def get_deterministic_temp_table_name(table_name: str, op: str) -> str:
299300
return f"{op_name}_{NamingConvention._compute_tag(op_name, 0.001)}"
300301

301302

303+
class WithTableReflectionAndSql(WithTableReflection, WithSqlClient):
304+
pass
305+
306+
307+
def _diff_between_actual_and_dlt_schema(
308+
client: WithTableReflectionAndSql,
309+
schema: Schema,
310+
table_name: str,
311+
actual_col_names: set[str],
312+
disregard_dlt_columns: bool = True,
313+
) -> TPartialTableSchema:
314+
"""Compares dlt schema with destination table schema and returns columns that appear to be missing.
315+
316+
This function identifies columns that exist in the dlt schema but are missing from the actual
317+
destination table. It's used during schema synchronization to detect when columns may have
318+
been dropped from the destination and need to be removed from the dlt schema as well.
319+
320+
However, dlt internal columns (_dlt_id, _dlt_load_id) are treated specially because:
321+
322+
1. Users rarely drop dlt internal columns manually, and if they did,
323+
dlt cannot recover from this situation anyway.
324+
325+
2. Athena has a constraint where dlt columns exist in the data but not in the table metadata:
326+
327+
- Athena external tables have fixed schemas defined at CREATE TABLE time
328+
- These columns exist in the actual data files but don't appear in INFORMATION_SCHEMA
329+
- This causes false positives where dlt columns appear "missing" when they're not
330+
331+
Args:
332+
client (WithTableReflectionAndSql): The destination client with table reflection capabilities.
333+
schema (Schema): The dlt schema to compare against the destination.
334+
table_name (str): Name of the table to analyze.
335+
actual_col_names (set[str]): Column names that actually exist in the destination table,
336+
typically obtained from INFORMATION_SCHEMA queries. For Athena,
337+
this may not include dlt columns present in the underlying data files.
338+
disregard_dlt_columns: Whether to ignore apparent mismatches for dlt internal
339+
columns (_dlt_id, _dlt_load_id). Defaults to True to prevent incorrect
340+
removal of essential dlt columns from the schema.
341+
342+
Returns:
343+
TPartialTableSchema: Returns a partial table schema containing columns that exist in the dlt schema
344+
but are missing from the actual table.
345+
346+
Example:
347+
If dlt schema has [user_id, name, _dlt_id, _dlt_load_id] but destination
348+
INFORMATION_SCHEMA only shows [user_id, name], this function would return
349+
an empty dict (assuming disregard_dlt_columns=True) rather than suggesting
350+
the dlt columns should be dropped.
351+
"""
352+
col_schemas = schema.get_table_columns(table_name)
353+
354+
# Map escaped (like actual_col_names) -> original names (what appears in the dlt schema)
355+
escaped_to_dlt = {
356+
client.sql_client.escape_column_name(col, quote=False): col for col in col_schemas.keys()
357+
}
358+
359+
possibly_dropped_col_names = set(escaped_to_dlt.keys()) - actual_col_names
360+
361+
if not possibly_dropped_col_names:
362+
return {}
363+
364+
partial_table: TPartialTableSchema = {"name": table_name, "columns": {}}
365+
366+
for esc_name in possibly_dropped_col_names:
367+
name_in_dlt = escaped_to_dlt[esc_name]
368+
369+
if disregard_dlt_columns and name_in_dlt in [C_DLT_ID, C_DLT_LOAD_ID]:
370+
continue
371+
372+
col_schema = col_schemas[name_in_dlt]
373+
if col_schema.get("increment"):
374+
# We can warn within the for loop,
375+
# since there's only one incremental field per table
376+
logger.warning(
377+
f"An incremental field {name_in_dlt} is being removed from schema."
378+
"You should unset the"
379+
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
380+
)
381+
partial_table["columns"][name_in_dlt] = col_schema
382+
383+
return partial_table if partial_table["columns"] else {}
384+
385+
302386
def update_dlt_schema(
303-
client: WithTableReflection,
387+
client: WithTableReflectionAndSql,
304388
schema: Schema,
305389
table_names: Iterable[str] = None,
306390
dry_run: bool = False,
307391
) -> Optional[TSchemaDrop]:
308-
"""Updates schema to the storage.
392+
"""Updates the dlt schema from destination.
309393
310394
Compare the schema we think we should have with what actually exists in the destination,
311395
and drop any tables and/or columns that disappeared.
312396
313397
Args:
398+
client (WithTableReflectionAndSql): The destination client with table reflection capabilities.
399+
schema (Schema): The dlt schema to compare against the destination.
314400
table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables.
401+
dry_run (bool, optional): Whether to actually update the dlt schema. Defaults to False.
315402
316403
Returns:
317-
Optional[TSchemaTables]: Returns an update that was applied to the schema.
404+
Optional[TSchemaDrop]: Returns the update that was applied to the schema.
318405
"""
319-
from dlt.destinations.sql_client import WithSqlClient
320-
321-
if not isinstance(client, WithSqlClient):
322-
raise NotImplementedError
323-
324-
def _diff_between_actual_and_dlt_schema(
325-
table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True
326-
) -> TPartialTableSchema:
327-
"""Returns a partial table schema containing columns that exist in the dlt schema
328-
but are missing from the actual table. Skips dlt internal columns by default.
329-
"""
330-
col_schemas = schema.get_table_columns(table_name)
331-
332-
# Map escaped -> original names (actual_col_names are escaped)
333-
escaped_to_original = {
334-
client.sql_client.escape_column_name(col, quote=False): col
335-
for col in col_schemas.keys()
336-
}
337-
dropped_col_names = set(escaped_to_original.keys()) - actual_col_names
338-
339-
if not dropped_col_names:
340-
return {}
341-
342-
partial_table: TPartialTableSchema = {"name": table_name, "columns": {}}
343-
344-
for esc_name in dropped_col_names:
345-
orig_name = escaped_to_original[esc_name]
346-
347-
# Athena doesn't have dlt columns in actual columns. Don't drop them anyway.
348-
if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]:
349-
continue
350-
351-
col_schema = col_schemas[orig_name]
352-
if col_schema.get("increment"):
353-
# We can warn within the for loop,
354-
# since there's only one incremental field per table
355-
logger.warning(
356-
f"An incremental field {orig_name} is being removed from schema."
357-
"You should unset the"
358-
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
359-
)
360-
partial_table["columns"][orig_name] = col_schema
361-
362-
return partial_table if partial_table["columns"] else {}
363-
364406
tables = table_names if table_names else schema.data_table_names()
365407

366408
table_drops: TSchemaDrop = {} # includes entire tables to drop
@@ -379,10 +421,12 @@ def _diff_between_actual_and_dlt_schema(
379421
continue
380422

381423
# actual column schemas present ->
382-
# we compare actual schemas with dlt ones ->
424+
# we compare actual column schemas with dlt ones ->
383425
# we take the difference as a partial table
384426
else:
385427
partial_table = _diff_between_actual_and_dlt_schema(
428+
client,
429+
schema,
386430
table_name,
387431
set(actual_col_schemas.keys()),
388432
)

dlt/pipeline/pipeline.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
113113
from dlt.destinations.fs_client import FSClientBase
114114
from dlt.destinations.dataset import get_destination_clients
115-
from dlt.destinations.utils import update_dlt_schema
115+
from dlt.destinations.utils import update_dlt_schema, WithTableReflectionAndSql
116116

117117
from dlt.load.configuration import LoaderConfiguration
118118
from dlt.load import Load
@@ -1066,9 +1066,12 @@ def sync_schema_from_destination(
10661066
with self._get_destination_clients(schema)[0] as client:
10671067
if not client.is_storage_initialized():
10681068
raise DestinationUndefinedEntity()
1069-
if isinstance(client, WithTableReflection):
1069+
if isinstance(client, WithTableReflection) and isinstance(client, WithSqlClient):
10701070
return update_dlt_schema(
1071-
client=client, schema=schema, table_names=table_names, dry_run=dry_run
1071+
client=cast(WithTableReflectionAndSql, client),
1072+
schema=schema,
1073+
table_names=table_names,
1074+
dry_run=dry_run,
10721075
)
10731076
else:
10741077
raise DestinationTableReflectionNotSupported(self._destination.destination_name)

tests/load/pipeline/test_sync_dlt_schema.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import os
23
from typing import cast
34

45
import pytest
@@ -11,7 +12,9 @@
1112

1213
from dlt.common import logger
1314
from tests.pipeline.utils import assert_load_info
15+
from tests.utils import TEST_STORAGE_ROOT
1416
from tests.load.utils import (
17+
FILE_BUCKET,
1518
destinations_configs,
1619
DestinationTestConfiguration,
1720
)
@@ -186,6 +189,12 @@ def test_sync_dlt_schema(
186189
_drop_column_in_sql(pipeline, destination_config, "my_table", "age")
187190
_drop_table_in_sql(pipeline, destination_config, "my_last_table")
188191

192+
# Sanity check that the tables are still all in the dlt schema
193+
assert all(
194+
table_name in pipeline.default_schema.tables
195+
for table_name in ["my_table", "my_other_table", "my_last_table"]
196+
)
197+
189198
# Make sure the warning about orphaned tables is emitted
190199
logger_spy = mocker.spy(logger, "warning")
191200

@@ -227,3 +236,40 @@ def test_sync_dlt_schema(
227236

228237
assert "my_last_table" not in pipeline.default_schema.tables
229238
assert "my_last_table__children" not in pipeline.default_schema.tables
239+
240+
241+
@pytest.mark.parametrize(
242+
"destination_config",
243+
destinations_configs(
244+
local_filesystem_configs=True,
245+
),
246+
ids=lambda x: x.name,
247+
)
248+
def test_regular_filesystem_tables(
249+
destination_config: DestinationTestConfiguration, mocker: MockerFixture
250+
) -> None:
251+
pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}")
252+
253+
# 1. Check whether dropping an entire table without a table format is handled correctly
254+
assert_load_info(pipeline.run(my_resource(), **destination_config.run_kwargs))
255+
256+
_drop_table_in_filesystem(pipeline, destination_config, "my_table")
257+
258+
# Sanity check that the table is still in the dlt schema
259+
assert "my_table" in pipeline.default_schema.tables
260+
261+
# An entire table drop should not emit warnings regardless of table format
262+
logger_spy = mocker.spy(logger, "warning")
263+
schema_drops = pipeline.sync_schema_from_destination()
264+
logger_spy.assert_not_called()
265+
266+
# The table should be in the schema drops and removed from schema itself
267+
assert len(schema_drops) == 1
268+
assert "my_table" in schema_drops
269+
assert "my_table" not in pipeline.default_schema.tables
270+
271+
# 2. When a column is dropped from a table without an open table format,
272+
# we emit a warning, but don't update the dlt schema
273+
assert_load_info(pipeline.run(my_resource(), **destination_config.run_kwargs))
274+
275+
# TODO: Finish this.

0 commit comments

Comments
 (0)