Skip to content

Commit 0504401

Browse files
committed
get_removed_table_columns in schema
1 parent 0209322 commit 0504401

File tree

2 files changed

+81
-78
lines changed

2 files changed

+81
-78
lines changed

dlt/common/schema/schema.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Set,
1616
)
1717

18+
from dlt.common import logger
1819
from dlt.common.schema.migrations import migrate_schema
1920
from dlt.common.utils import extend_list_deduplicated, simple_repr, without_none
2021
from dlt.common.typing import (
@@ -521,6 +522,74 @@ def get_new_table_columns(
521522
diff_c.append(c)
522523
return diff_c
523524

525+
def get_removed_table_columns(
526+
self,
527+
table_name: str,
528+
existing_columns: TTableSchemaColumns,
529+
escape_col_f: Callable[[str, bool, bool], str],
530+
disregard_dlt_columns: bool = True,
531+
) -> List[TColumnSchema]:
532+
"""Gets columns to be removed from schema to match `existing_columns`.
533+
534+
This function identifies columns that exist in the dlt schema but are missing from the
535+
destination table. It's used during schema synchronization to detect when columns have
536+
been dropped from the destination and need to be removed from the dlt schema as well.
537+
538+
Column names are compared by transforming dlt schema names to destination format using
539+
`escape_col_f`. `existing_columns` are expected to be in destination format (as they
540+
appear in the destination's INFORMATION_SCHEMA).
541+
542+
dlt internal columns (_dlt_id, _dlt_load_id) can be optionally disregarded because
543+
users rarely drop these columns manually, and if they did, dlt cannot recover from
544+
this situation anyway.
545+
546+
Args:
547+
table_name (str): Name of the table to analyze.
548+
existing_columns (TTableSchemaColumns): Column schemas that actually exist in the
549+
destination table, typically obtained from INFORMATION_SCHEMA queries. Column
550+
names should be in destination format.
551+
escape_col_f (Callable[[str, bool, bool], str]): Function to transform dlt column
552+
names to destination format (e.g., 'id' -> 'ID' in Snowflake).
553+
disregard_dlt_columns (bool): Whether to ignore apparent mismatches for dlt internal
554+
columns (_dlt_id, _dlt_load_id). Defaults to True.
555+
556+
Returns:
557+
List[TColumnSchema]: List of column schemas that exist in the dlt schema but are
558+
missing from the destination table.
559+
"""
560+
col_schemas = self.get_table_columns(table_name)
561+
562+
# Transform dlt schema column names to destination format (e.g., 'id' -> 'ID' in Snowflake)
563+
# to match against actual_col_names from INFORMATION_SCHEMA
564+
# Keys: destination format, Values: original dlt schema names
565+
escaped_to_dlt = {escape_col_f(col, False, True): col for col in col_schemas.keys()}
566+
567+
possibly_dropped_col_names = set(escaped_to_dlt.keys()) - set(existing_columns.keys())
568+
569+
if not possibly_dropped_col_names:
570+
return []
571+
572+
removed_cols: List[TColumnSchema] = []
573+
574+
for esc_name in possibly_dropped_col_names:
575+
name_in_dlt = escaped_to_dlt[esc_name]
576+
577+
if disregard_dlt_columns and self.is_dlt_entity(name_in_dlt):
578+
continue
579+
580+
col_schema = col_schemas[name_in_dlt]
581+
if col_schema.get("incremental"):
582+
# We can warn within the for loop,
583+
# since there's only one incremental field per table
584+
logger.warning(
585+
f"An incremental field {name_in_dlt} is being removed from schema."
586+
"You should unset the"
587+
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
588+
)
589+
removed_cols.append(col_schema)
590+
591+
return removed_cols
592+
524593
def get_table(self, table_name: str) -> TTableSchema:
525594
try:
526595
return self._schema_tables[table_name]

dlt/destinations/utils.py

Lines changed: 12 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from typing import Any, List, Dict, Type, Optional, Sequence, Tuple, cast, Iterable, Callable
44

5+
from sqlglot import column
6+
57
from dlt.common import logger
68
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
79
from dlt.common.destination.typing import PreparedTableSchema
@@ -301,78 +303,6 @@ class WithTableReflectionAndSql(WithTableReflection, WithSqlClient):
301303
pass
302304

303305

304-
def get_removed_table_columns(
305-
escape_col_f: Callable[[str, bool, bool], str],
306-
schema: Schema,
307-
table_name: str,
308-
actual_col_names: set[str],
309-
disregard_dlt_columns: bool = True,
310-
) -> TPartialTableSchema:
311-
"""Compares dlt schema with destination table schema and returns columns that appear to be missing.
312-
313-
This function identifies columns that exist in the dlt schema but are missing from the actual
314-
destination table. It's used during schema synchronization to detect when columns may have
315-
been dropped from the destination and need to be removed from the dlt schema as well.
316-
317-
However, dlt internal columns (_dlt_id, _dlt_load_id) are treated specially because
318-
users rarely drop dlt internal columns manually, and if they did,
319-
dlt cannot recover from this situation anyway.
320-
321-
Args:
322-
client (WithTableReflectionAndSql): The destination client with table reflection capabilities.
323-
schema (Schema): The dlt schema to compare against the destination.
324-
table_name (str): Name of the table to analyze.
325-
actual_col_names (set[str]): Column names that actually exist in the destination table,
326-
typically obtained from INFORMATION_SCHEMA queries. For Athena,
327-
this may not include dlt columns present in the underlying data files.
328-
disregard_dlt_columns: Whether to ignore apparent mismatches for dlt internal
329-
columns (_dlt_id, _dlt_load_id). Defaults to True to prevent incorrect
330-
removal of essential dlt columns from the schema.
331-
332-
Returns:
333-
TPartialTableSchema: Returns a partial table schema containing columns that exist in the dlt schema
334-
but are missing from the actual table.
335-
336-
Example:
337-
If dlt schema has [user_id, name, _dlt_id, _dlt_load_id] but destination
338-
INFORMATION_SCHEMA only shows [user_id, name], this function would return
339-
an empty dict (assuming disregard_dlt_columns=True) rather than suggesting
340-
the dlt columns should be dropped.
341-
"""
342-
col_schemas = schema.get_table_columns(table_name)
343-
344-
# Transform dlt schema column names to destination format (e.g., 'id' -> 'ID' in Snowflake)
345-
# to match against actual_col_names from INFORMATION_SCHEMA
346-
# Keys: destination format, Values: original dlt schema names
347-
escaped_to_dlt = {escape_col_f(col, False, True): col for col in col_schemas.keys()}
348-
349-
possibly_dropped_col_names = set(escaped_to_dlt.keys()) - actual_col_names
350-
351-
if not possibly_dropped_col_names:
352-
return {}
353-
354-
partial_table: TPartialTableSchema = {"name": table_name, "columns": {}}
355-
356-
for esc_name in possibly_dropped_col_names:
357-
name_in_dlt = escaped_to_dlt[esc_name]
358-
359-
if disregard_dlt_columns and schema.is_dlt_entity(name_in_dlt):
360-
continue
361-
362-
col_schema = col_schemas[name_in_dlt]
363-
if col_schema.get("increment"):
364-
# We can warn within the for loop,
365-
# since there's only one incremental field per table
366-
logger.warning(
367-
f"An incremental field {name_in_dlt} is being removed from schema."
368-
"You should unset the"
369-
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
370-
)
371-
partial_table["columns"][name_in_dlt] = col_schema
372-
373-
return partial_table if partial_table["columns"] else {}
374-
375-
376306
def sync_schema_from_storage_schema(
377307
get_storage_tables_f: Callable[[Iterable[str]], Iterable[tuple[str, dict[str, TColumnSchema]]]],
378308
escape_col_f: Callable[[str, bool, bool], str],
@@ -416,13 +346,17 @@ def sync_schema_from_storage_schema(
416346
# we compare actual column schemas with dlt ones ->
417347
# we take the difference as a partial table
418348
else:
419-
partial_table = get_removed_table_columns(
420-
escape_col_f,
421-
schema,
422-
table_name,
423-
set(actual_col_schemas.keys()),
349+
removed_columns = schema.get_removed_table_columns(
350+
table_name=table_name,
351+
existing_columns=actual_col_schemas,
352+
escape_col_f=escape_col_f,
353+
disregard_dlt_columns=True,
424354
)
425-
if partial_table:
355+
if removed_columns:
356+
partial_table: TPartialTableSchema = {
357+
"name": table_name,
358+
"columns": {col["name"]: col for col in removed_columns},
359+
}
426360
column_drops[table_name] = partial_table
427361

428362
# 2. For entire table drops, we make sure no orphaned tables remain

0 commit comments

Comments
 (0)