Skip to content

Commit 9471bc4

Browse files
committed
get_removed_table_columns in schema
1 parent 0209322 commit 9471bc4

File tree

3 files changed

+105
-78
lines changed

3 files changed

+105
-78
lines changed

dlt/common/schema/exceptions.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,32 @@ class SchemaCorruptedException(SchemaException):
104104
pass
105105

106106

107+
class SchemaIdentifierDestinationCollision(SchemaCorruptedException):
108+
"""Raised when multiple column names collide after transformation to destination format.
109+
110+
This should not happen under normal circumstances and indicates schema corruption.
111+
"""
112+
113+
def __init__(
114+
self,
115+
schema_name: str,
116+
table_name: str,
117+
colliding_columns: List[str],
118+
destination_format: str,
119+
) -> None:
120+
self.table_name = table_name
121+
self.colliding_columns = colliding_columns
122+
self.destination_format = destination_format
123+
124+
msg = (
125+
f"Multiple columns in table `{table_name}` collide when transformed to"
126+
f" destination format `{destination_format}`:"
127+
f" {', '.join(repr(c) for c in colliding_columns)}."
128+
" This should not happen under normal circumstances and indicates schema corruption."
129+
)
130+
super().__init__(schema_name, msg)
131+
132+
107133
class SchemaIdentifierNormalizationCollision(SchemaCorruptedException):
108134
def __init__(
109135
self,

dlt/common/schema/schema.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Set,
1616
)
1717

18+
from dlt.common import logger
1819
from dlt.common.schema.migrations import migrate_schema
1920
from dlt.common.utils import extend_list_deduplicated, simple_repr, without_none
2021
from dlt.common.typing import (
@@ -521,6 +522,72 @@ def get_new_table_columns(
521522
diff_c.append(c)
522523
return diff_c
523524

525+
def get_removed_table_columns(
526+
self,
527+
table_name: str,
528+
existing_columns: TTableSchemaColumns,
529+
escape_col_f: Callable[[str, bool, bool], str],
530+
disregard_dlt_columns: bool = True,
531+
) -> List[TColumnSchema]:
532+
"""Gets columns to be removed from schema to match `existing_columns`.
533+
534+
This function identifies columns that exist in the dlt schema but are missing from the
535+
destination table. It's used during schema synchronization to detect when columns have
536+
been dropped from the destination and need to be removed from the dlt schema as well.
537+
538+
Column names are compared by transforming dlt schema names to destination format using
539+
`escape_col_f`. `existing_columns` are expected to be in destination format (as they
540+
appear in the destination's INFORMATION_SCHEMA).
541+
542+
dlt internal columns (_dlt_id, _dlt_load_id) can be optionally disregarded because
543+
users rarely drop these columns manually, and if they did, dlt cannot recover from
544+
this situation anyway.
545+
546+
Args:
547+
table_name (str): Name of the table to analyze.
548+
existing_columns (TTableSchemaColumns): Column schemas that actually exist in the
549+
destination table, typically obtained from INFORMATION_SCHEMA queries. Column
550+
names should be in destination format.
551+
escape_col_f (Callable[[str, bool, bool], str]): Function to transform dlt column
552+
names to destination format (e.g., 'id' -> 'ID' in Snowflake).
553+
disregard_dlt_columns (bool): Whether to ignore apparent mismatches for dlt internal
554+
columns (_dlt_id, _dlt_load_id). Defaults to True.
555+
556+
Returns:
557+
List[TColumnSchema]: List of column schemas that exist in the dlt schema but are
558+
missing from the destination table.
559+
"""
560+
# Transform dlt schema column names to destination format (e.g., 'id' -> 'ID' in Snowflake)
561+
# to match against actual_col_names from INFORMATION_SCHEMA
562+
# Keys: destination format, Values: original dlt schema names
563+
col_schemas = self.get_table_columns(table_name)
564+
escaped_to_dlt = {escape_col_f(col, False, True): col for col in col_schemas.keys()}
565+
566+
if len(escaped_to_dlt) != len(col_schemas):
567+
raise SchemaCorruptedException(
568+
self.name,
569+
f"Columns in table `{table_name}` have colliding names when transformed to"
570+
" destination format. Original dlt schema column names:"
571+
f" {list(col_schemas.keys())}. Destination format names:"
572+
f" {list(escaped_to_dlt.keys())}. This should not happen under normal circumstances"
573+
" and indicates schema corruption.",
574+
)
575+
576+
diff_c: List[TColumnSchema] = []
577+
for dest_name, name_in_dlt in escaped_to_dlt.items():
578+
if disregard_dlt_columns and self.is_dlt_entity(name_in_dlt):
579+
continue
580+
if dest_name not in existing_columns:
581+
col_schema = col_schemas[name_in_dlt]
582+
if col_schema.get("incremental"):
583+
logger.warning(
584+
f"An incremental field {name_in_dlt} is being removed from schema."
585+
"You should unset the"
586+
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
587+
)
588+
diff_c.append(col_schema)
589+
return diff_c
590+
524591
def get_table(self, table_name: str) -> TTableSchema:
525592
try:
526593
return self._schema_tables[table_name]

dlt/destinations/utils.py

Lines changed: 12 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from typing import Any, List, Dict, Type, Optional, Sequence, Tuple, cast, Iterable, Callable
44

5+
from sqlglot import column
6+
57
from dlt.common import logger
68
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
79
from dlt.common.destination.typing import PreparedTableSchema
@@ -301,78 +303,6 @@ class WithTableReflectionAndSql(WithTableReflection, WithSqlClient):
301303
pass
302304

303305

304-
def get_removed_table_columns(
305-
escape_col_f: Callable[[str, bool, bool], str],
306-
schema: Schema,
307-
table_name: str,
308-
actual_col_names: set[str],
309-
disregard_dlt_columns: bool = True,
310-
) -> TPartialTableSchema:
311-
"""Compares dlt schema with destination table schema and returns columns that appear to be missing.
312-
313-
This function identifies columns that exist in the dlt schema but are missing from the actual
314-
destination table. It's used during schema synchronization to detect when columns may have
315-
been dropped from the destination and need to be removed from the dlt schema as well.
316-
317-
However, dlt internal columns (_dlt_id, _dlt_load_id) are treated specially because
318-
users rarely drop dlt internal columns manually, and if they did,
319-
dlt cannot recover from this situation anyway.
320-
321-
Args:
322-
client (WithTableReflectionAndSql): The destination client with table reflection capabilities.
323-
schema (Schema): The dlt schema to compare against the destination.
324-
table_name (str): Name of the table to analyze.
325-
actual_col_names (set[str]): Column names that actually exist in the destination table,
326-
typically obtained from INFORMATION_SCHEMA queries. For Athena,
327-
this may not include dlt columns present in the underlying data files.
328-
disregard_dlt_columns: Whether to ignore apparent mismatches for dlt internal
329-
columns (_dlt_id, _dlt_load_id). Defaults to True to prevent incorrect
330-
removal of essential dlt columns from the schema.
331-
332-
Returns:
333-
TPartialTableSchema: Returns a partial table schema containing columns that exist in the dlt schema
334-
but are missing from the actual table.
335-
336-
Example:
337-
If dlt schema has [user_id, name, _dlt_id, _dlt_load_id] but destination
338-
INFORMATION_SCHEMA only shows [user_id, name], this function would return
339-
an empty dict (assuming disregard_dlt_columns=True) rather than suggesting
340-
the dlt columns should be dropped.
341-
"""
342-
col_schemas = schema.get_table_columns(table_name)
343-
344-
# Transform dlt schema column names to destination format (e.g., 'id' -> 'ID' in Snowflake)
345-
# to match against actual_col_names from INFORMATION_SCHEMA
346-
# Keys: destination format, Values: original dlt schema names
347-
escaped_to_dlt = {escape_col_f(col, False, True): col for col in col_schemas.keys()}
348-
349-
possibly_dropped_col_names = set(escaped_to_dlt.keys()) - actual_col_names
350-
351-
if not possibly_dropped_col_names:
352-
return {}
353-
354-
partial_table: TPartialTableSchema = {"name": table_name, "columns": {}}
355-
356-
for esc_name in possibly_dropped_col_names:
357-
name_in_dlt = escaped_to_dlt[esc_name]
358-
359-
if disregard_dlt_columns and schema.is_dlt_entity(name_in_dlt):
360-
continue
361-
362-
col_schema = col_schemas[name_in_dlt]
363-
if col_schema.get("increment"):
364-
# We can warn within the for loop,
365-
# since there's only one incremental field per table
366-
logger.warning(
367-
f"An incremental field {name_in_dlt} is being removed from schema."
368-
"You should unset the"
369-
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
370-
)
371-
partial_table["columns"][name_in_dlt] = col_schema
372-
373-
return partial_table if partial_table["columns"] else {}
374-
375-
376306
def sync_schema_from_storage_schema(
377307
get_storage_tables_f: Callable[[Iterable[str]], Iterable[tuple[str, dict[str, TColumnSchema]]]],
378308
escape_col_f: Callable[[str, bool, bool], str],
@@ -416,13 +346,17 @@ def sync_schema_from_storage_schema(
416346
# we compare actual column schemas with dlt ones ->
417347
# we take the difference as a partial table
418348
else:
419-
partial_table = get_removed_table_columns(
420-
escape_col_f,
421-
schema,
422-
table_name,
423-
set(actual_col_schemas.keys()),
349+
removed_columns = schema.get_removed_table_columns(
350+
table_name=table_name,
351+
existing_columns=actual_col_schemas,
352+
escape_col_f=escape_col_f,
353+
disregard_dlt_columns=True,
424354
)
425-
if partial_table:
355+
if removed_columns:
356+
partial_table: TPartialTableSchema = {
357+
"name": table_name,
358+
"columns": {col["name"]: col for col in removed_columns},
359+
}
426360
column_drops[table_name] = partial_table
427361

428362
# 2. For entire table drops, we make sure no orphaned tables remain

0 commit comments

Comments
 (0)