-
Notifications
You must be signed in to change notification settings - Fork 400
Description
dlt version
1.17.1
Describe the problem
I’d like to set a custom value for _dlt_valid_from the first time I run the pipeline with scd2 merge strategy, and pipeline run timestamp for the following runs. I tried setting boundary_timestamp for the first run and then removing it.
The first time, I’m pulling data from a local backup :
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "scd2", "boundary_timestamp": "2024-12-12"},
)
def my_resource():
# reading data from a local json backup
yield {"a": 1}
For the second run, I’m deleting the "boundary_timestamp":
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "scd2"},
)
def my_resource():
# pulling data from an external API
yield {"a": 2}
The table after the second run looks like this:
> select _dlt_valid_from, _dlt_valid_to from table;
┌──────────────────────────┬──────────────────────────┬───────┬────────────────────┬────────────────┐
│ _dlt_valid_from │ _dlt_valid_to │ a │ _dlt_load_id │ _dlt_id │
│ timestamp with time zone │ timestamp with time zone │ int64 │ varchar │ varchar │
├──────────────────────────┼──────────────────────────┼───────┼────────────────────┼────────────────┤
│ 2024-12-12 01:00:00+01 │ 2024-12-12 01:00:00+01 │ 1 │ 1760084979.8407435 │ Ec9HPMTWWD8wGA │
│ 2024-12-12 01:00:00+01 │ NULL │ 2 │ 1760085096.16156 │ Wb8LIv72/V4uzw │
└──────────────────────────┴──────────────────────────┴───────┴────────────────────┴────────────────┘
The first row’s _dlt_valid_to column and the second row’s _dlt_valid_from column were both set to the initial value of boundary_timestamp.
Expected behavior
I expect the first row’s _dlt_valid_to column and the second row’s _dlt_valid_from column to be set to the current timestamp at the moment when the pipeline is run.
Steps to reproduce
First run:
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "scd2", "boundary_timestamp": "2024-12-12"},
)
def my_resource():
# reading data from a local json backup
yield {"a": 1}
Second run, without boundary_timestamp and modified data:
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "scd2"},
)
def my_resource():
# pulling data from an external API
yield {"a": 2}
Operating system
Linux
Runtime environment
Local
Python version
3.11
dlt data source
No response
dlt destination
DuckDB
Other deployment details
No response
Additional information
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status