Skip to content

Commit f9ef3eb

Browse files
authored
feat: P4ADEV-4174 add stg to dwh layer models (#4)
1 parent fd06122 commit f9ef3eb

File tree

4 files changed

+214
-0
lines changed

4 files changed

+214
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{{ config(
2+
materialized = "incremental",
3+
unique_key = "assessment_pk",
4+
incremental_strategy = "merge",
5+
) }}
6+
7+
with base as (
8+
select
9+
assessment_code,
10+
debt_position_type_org_id,
11+
CAST(TO_CHAR(date_receipt, 'YYYY') AS INT) AS operating_year,
12+
assessment_description,
13+
md5(coalesce(assessment_description, '')) as hash_checksum,
14+
processed_time
15+
from {{ ref('payment_assessment_detail') }}
16+
),
17+
18+
source as (
19+
select distinct
20+
{{ dbt_utils.generate_surrogate_key(['assessment_code', 'debt_position_type_org_id', 'operating_year']) }} as assessment_pk,
21+
assessment_code,
22+
debt_position_type_org_id,
23+
operating_year,
24+
assessment_description,
25+
hash_checksum
26+
from base as b
27+
28+
{% if is_incremental() %}
29+
where b.processed_time >= (select coalesce(max(processed_time), '1900-01-01') from {{ this }} )
30+
{% endif %}
31+
),
32+
33+
new_data as (
34+
select s.*
35+
from source s
36+
left join {{ this }} t on
37+
s.assessment_pk = t.assessment_pk
38+
where s.hash_checksum <> coalesce(t.hash_checksum, '')
39+
)
40+
41+
select *, current_timestamp as processed_time
42+
from new_data
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{{ config(
2+
materialized = "incremental",
3+
unique_key = "office_pk",
4+
incremental_strategy = "merge",
5+
) }}
6+
7+
with base as (
8+
select
9+
office_code,
10+
debt_position_type_org_id,
11+
CAST(TO_CHAR(date_receipt, 'YYYY') AS INT) AS operating_year,
12+
office_description,
13+
md5(coalesce(office_description, '')) as hash_checksum,
14+
processed_time
15+
from {{ ref('payment_assessment_detail') }}
16+
),
17+
18+
source as (
19+
select distinct
20+
{{ dbt_utils.generate_surrogate_key(['office_code', 'debt_position_type_org_id', 'operating_year']) }} as office_pk,
21+
office_code,
22+
debt_position_type_org_id,
23+
operating_year,
24+
office_description,
25+
hash_checksum
26+
from base as b
27+
28+
{% if is_incremental() %}
29+
where b.processed_time >= (select coalesce(max(processed_time), '1900-01-01') from {{ this }} )
30+
{% endif %}
31+
),
32+
33+
new_data as (
34+
select s.*
35+
from source s
36+
left join {{ this }} t on
37+
s.office_pk = t.office_pk
38+
where s.hash_checksum <> coalesce(t.hash_checksum, '')
39+
)
40+
41+
select *, current_timestamp as processed_time
42+
from new_data
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{{ config(
2+
materialized = "incremental",
3+
unique_key = "section_pk",
4+
incremental_strategy = "merge",
5+
) }}
6+
7+
with base as (
8+
select
9+
section_code,
10+
debt_position_type_org_id,
11+
CAST(TO_CHAR(date_receipt, 'YYYY') AS INT) AS operating_year,
12+
section_description,
13+
md5(coalesce(section_description, '')) as hash_checksum,
14+
processed_time
15+
from {{ ref('payment_assessment_detail') }}
16+
),
17+
18+
source as (
19+
select distinct
20+
{{ dbt_utils.generate_surrogate_key(['section_code', 'debt_position_type_org_id', 'operating_year']) }} as section_pk,
21+
section_code,
22+
debt_position_type_org_id,
23+
operating_year,
24+
section_description,
25+
hash_checksum
26+
from base as b
27+
28+
{% if is_incremental() %}
29+
where b.processed_time >= (select coalesce(max(processed_time), '1900-01-01') from {{ this }} )
30+
{% endif %}
31+
),
32+
33+
new_data as (
34+
select s.*
35+
from source s
36+
left join {{ this }} t on
37+
s.section_pk = t.section_pk
38+
where s.hash_checksum <> coalesce(t.hash_checksum, '')
39+
)
40+
41+
select *, current_timestamp as processed_time
42+
from new_data
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
{{ config(
2+
materialized = "incremental",
3+
unique_key = "assessment_detail_pk",
4+
incremental_strategy = "merge",
5+
pre_hook="
6+
UPDATE {{ this }} as t
7+
SET is_active = false
8+
WHERE (organization_id, iud) IN (
9+
SELECT organization_id, iud
10+
FROM {{ ref('payment_assessment_detail_tmp') }} as tmp
11+
WHERE tmp.processed_time >= (select coalesce(max(processed_time), '1900-01-01') from {{ this }} )
12+
)
13+
"
14+
)
15+
}}
16+
17+
18+
with base as (
19+
select *, CAST(TO_CHAR(date_receipt, 'YYYY') AS INT) AS operating_year,
20+
DATE(payment_date_time) as payment_date
21+
from {{ ref('payment_assessment_detail') }}
22+
),
23+
24+
source as (
25+
select
26+
assessment_detail_pk,
27+
assessment_detail_id,
28+
assessment_id,
29+
organization_id,
30+
{{ dbt_utils.generate_surrogate_key(['debt_position_type_org_id']) }} as debt_position_type_org_pk,
31+
{{ dbt_utils.generate_surrogate_key(['office_code', 'debt_position_type_org_id', 'operating_year']) }} as office_pk,
32+
{{ dbt_utils.generate_surrogate_key(['section_code', 'debt_position_type_org_id', 'operating_year']) }} as section_pk,
33+
{{ dbt_utils.generate_surrogate_key(['assessment_code', 'debt_position_type_org_id', 'operating_year']) }} as assessment_pk,
34+
{{ dbt_utils.generate_surrogate_key(['payment_date']) }} as payment_date_pk,
35+
CASE WHEN
36+
date_receipt IS NULL THEN NULL
37+
ELSE {{ dbt_utils.generate_surrogate_key(['date_receipt']) }} END AS date_receipt_pk,
38+
CASE WHEN
39+
date_reporting IS NULL THEN NULL
40+
ELSE {{ dbt_utils.generate_surrogate_key(['date_reporting']) }} END as date_reporting_pk,
41+
CASE WHEN
42+
date_treasury IS NULL THEN NULL
43+
ELSE {{ dbt_utils.generate_surrogate_key(['date_treasury']) }} END as date_treasury_pk,
44+
iuv,
45+
iud,
46+
iur,
47+
debtor_fiscal_code_hash,
48+
payment_date_time,
49+
assessment_name,
50+
amount_cents,
51+
amount_submitted,
52+
receipt_id,
53+
classification_label,
54+
processed_time as src_processed_time,
55+
current_timestamp as target_processed_time
56+
57+
from base as b
58+
59+
{% if is_incremental() %}
60+
where b.processed_time >= (select coalesce(max(processed_time), '1900-01-01') from {{ this }} )
61+
{% endif %}
62+
)
63+
64+
select
65+
assessment_detail_pk,
66+
assessment_detail_id,
67+
assessment_id,
68+
organization_id,
69+
debt_position_type_org_pk,
70+
office_pk,
71+
section_pk,
72+
assessment_pk,
73+
payment_date_pk,
74+
date_receipt_pk,
75+
date_reporting_pk,
76+
date_treasury_pk,
77+
iuv,
78+
iud,
79+
iur,
80+
debtor_fiscal_code_hash,
81+
assessment_name,
82+
amount_cents,
83+
amount_submitted,
84+
receipt_id,
85+
classification_label,
86+
target_processed_time as processed_time,
87+
TRUE as is_active
88+
from source

0 commit comments

Comments
 (0)