Skip to content

Commit aa93c6b

Browse files
committed
Lifecycle test
1 parent 95675ad commit aa93c6b

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

docs/website/docs/hub/features/transformations/index.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,10 @@ In the load stage, the normalized queries from `.model` files are wrapped in INS
212212
For example, given this query from the extract stage:
213213

214214
```sql
215-
SELECT id, value FROM table
215+
SELECT
216+
"my_table"."id" AS "id",
217+
"my_table"."value" AS "value"
218+
FROM "my_pipeline_dataset"."my_table" AS "my_table"
216219
```
217220

218221
After the normalize stage processes it (adding dlt columns, wrapping in subquery, etc.) and results in:

tests/load/test_model_item_format.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import io
33

44
from typing import Any, List
5-
from unittest.mock import MagicMock
5+
from unittest.mock import MagicMock, patch
66
import pytest
77
import dlt
88

@@ -925,3 +925,79 @@ def copied_table() -> Any:
925925
)
926926
assert py_exc.value.step == "load"
927927
assert isinstance(py_exc.value.__context__, LoadClientJobException)
928+
929+
930+
def test_relation_lifecycle() -> None:
931+
"""Test showing the lifecycle of a model: how it looks like after each pipeline step."""
932+
pipeline = dlt.pipeline(pipeline_name=f"rel_lifecycle_{uniq_id()}", destination="duckdb")
933+
pipeline.run(
934+
[{"a": string, "b": i} for i, string in enumerate(["I", "love", "dlt"])],
935+
table_name="example_table",
936+
)
937+
938+
dataset = pipeline.dataset()
939+
rel = dataset["example_table"][["a", "_dlt_load_id", "_dlt_id"]]
940+
941+
@dlt.resource
942+
def my_resource() -> Any:
943+
yield dlt.mark.with_hints(
944+
rel,
945+
hints=make_hints(
946+
columns={
947+
k: v
948+
for k, v in pipeline.default_schema.tables["example_table"]["columns"].items()
949+
}
950+
),
951+
)
952+
953+
load_info = pipeline.extract(my_resource())
954+
955+
# Get the extracted model file
956+
load_id = load_info.loads_ids[0]
957+
normalize_storage = pipeline._get_normalize_storage()
958+
model_job_file = normalize_storage.extracted_packages.list_new_jobs(load_id)[0]
959+
assert model_job_file.endswith(".model")
960+
961+
# Ensure it contains the dialect and the query string
962+
with normalize_storage.extracted_packages.storage.open_file(model_job_file, "r") as f:
963+
content = f.read()
964+
extracted_query = rel.to_sql()
965+
assert f"dialect: duckdb\n{extracted_query}\n" == content
966+
967+
pipeline.normalize()
968+
969+
# Get the normalized model file
970+
load_storage = pipeline._get_load_storage()
971+
model_job_file = load_storage.normalized_packages.list_new_jobs(load_id)[0]
972+
assert model_job_file.endswith(".model")
973+
974+
# Ensure it contains the normalized query that, for example:
975+
# - has the _dlt_load_id column
976+
# - has NULL set as column b since we selected only a
977+
# - is wrapped in a subquery
978+
with load_storage.normalized_packages.storage.open_file(model_job_file, "r") as f:
979+
content = f.read()
980+
981+
normalized_query = f"""SELECT _dlt_subquery."a" AS "a", NULL AS "b", \'{load_id}\' AS "_dlt_load_id", _dlt_subquery."_dlt_id" AS "_dlt_id" FROM ({extracted_query}) AS _dlt_subquery"""
982+
assert f"dialect: duckdb\n{normalized_query}\n" == content
983+
984+
from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient
985+
986+
# Get the insert query that is executed during load
987+
captured_sqls = []
988+
_original_execute = DuckDbSqlClient.execute_sql
989+
990+
def spy_execute_sql(self, sql, *args, **kwargs):
991+
captured_sqls.append(sql)
992+
return _original_execute(self, sql, *args, **kwargs)
993+
994+
with patch.object(DuckDbSqlClient, "execute_sql", spy_execute_sql):
995+
pipeline.load()
996+
997+
# Assert exact match with the actual INSERT statement
998+
loaded_query = f"""INSERT INTO "{pipeline.dataset_name}"."my_resource" ("a", "b", "_dlt_load_id", "_dlt_id") {normalized_query}"""
999+
all_insert_stmts = [
1000+
sql for sql in captured_sqls if isinstance(sql, str) and sql.startswith("INSERT INTO")
1001+
]
1002+
my_resource_insert = [stmt for stmt in all_insert_stmts if '"my_resource"' in stmt][0]
1003+
assert loaded_query == my_resource_insert

0 commit comments

Comments
 (0)