Skip to content

Commit e192609

Browse files
authored
Fix null handling of source freshness and dbt_project vs schema spec precedence (#11698) (#11703)
* Handle explicit setting of null for source freshness config * Abstract out the creation of the target config This is useful because it makes that portion of code more re-usable/portable and makes the work we are about to do easier. * Fix bug in `merge_source_freshness` where empty freshness was preferenced over `None` The issue was that during merging of freshnesses, an "empty freshness", one where all values are `None`, was being preferenced over `None`. This was problematic because an "empty freshness" indicates that a freshness was not specified at that level. While `None` means that the freshness was _explicitly_ set to `None`. As such we should preference the thing that was specifically set. * Properly get dbt_project defined freshness and don't merge with schema defined freshness Previously we were only getting the "top level" freshness from the dbt_project.yaml. This was ignoring freshness settings for the direct, source, and table set in the dbt_project.yaml. Additionally, we were merging the dbt_project.yaml freshness into the schema freshness. Long term this merging would be desireably, however before we do that we need to ensure freshness at diffrent levels within the dbt_project.yml get properly merged (currently the different levels clobber each other). Fixing that is a larger issue though. So for the time being, the schema defintion of freshness will clobber any dbt_project.yml definition of freshness. * Add changie doc * Fix whitespace to make code quality happy * Set the parsed source freshness to an empty FreshnessThreshold if None This maintains backwards compatibility
1 parent 3b92f67 commit e192609

File tree

5 files changed

+188
-26
lines changed

5 files changed

+188
-26
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Fix source freshness set via config to handle explicit nulls
3+
time: 2025-05-30T00:58:04.94133-05:00
4+
custom:
5+
Author: QMalcolm
6+
Issue: "11685"

core/dbt/parser/sources.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -136,30 +136,6 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
136136
else:
137137
loaded_at_field = source.loaded_at_field # may be None, that's okay
138138

139-
try:
140-
project_freshness = FreshnessThreshold.from_dict(
141-
self.root_project.sources.get("+freshness", {})
142-
)
143-
except ValueError:
144-
fire_event(
145-
FreshnessConfigProblem(
146-
msg="Could not validate `freshness` for `sources` in 'dbt_project.yml', ignoring. Please see https://docs.getdbt.com/docs/build/sources#source-data-freshness for more information.",
147-
)
148-
)
149-
project_freshness = None
150-
151-
source_freshness = source.freshness
152-
source_config_freshness = FreshnessThreshold.from_dict(source.config.get("freshness", {}))
153-
table_freshness = table.freshness
154-
table_config_freshness = FreshnessThreshold.from_dict(table.config.get("freshness", {}))
155-
freshness = merge_freshness(
156-
project_freshness,
157-
source_freshness,
158-
source_config_freshness,
159-
table_freshness,
160-
table_config_freshness,
161-
)
162-
163139
quoting = source.quoting.merged(table.quoting)
164140
# path = block.path.original_file_path
165141
table_meta = table.meta or {}
@@ -208,7 +184,8 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
208184
meta=meta,
209185
loader=source.loader,
210186
loaded_at_field=loaded_at_field,
211-
freshness=freshness,
187+
# The setting to an empty freshness object is to maintain what we were previously doing if no freshenss was specified
188+
freshness=config.freshness or FreshnessThreshold(),
212189
quoting=quoting,
213190
resource_type=NodeType.Source,
214191
fqn=target.fqn,
@@ -325,6 +302,19 @@ def _generate_source_config(self, target: UnpatchedSourceDefinition, rendered: b
325302
# it works while source configs can only include `enabled`.
326303
precedence_configs.update(target.table.config)
327304

305+
precedence_freshness = self.calculate_freshness_from_raw_target(target)
306+
if precedence_freshness:
307+
precedence_configs["freshness"] = precedence_freshness.to_dict()
308+
elif precedence_freshness is None:
309+
precedence_configs["freshness"] = None
310+
else:
311+
# this means that the user did not set a freshness threshold in the source schema file, as such
312+
# there should be no freshness precedence
313+
precedence_configs.pop("freshness", None)
314+
315+
# Because freshness is a "object" config, the freshness from the dbt_project.yml and the freshness
316+
# from the schema file _won't_ get merged by this process. The result will be that the freshness will
317+
# come from the schema file if provided, and if not, it'll fall back to the dbt_project.yml freshness.
328318
return generator.calculate_node_config(
329319
config_call_dict={},
330320
fqn=target.fqn,
@@ -377,6 +367,42 @@ def get_unused_msg(
377367
)
378368
return unused_tables_formatted
379369

370+
def calculate_freshness_from_raw_target(
371+
self,
372+
target: UnpatchedSourceDefinition,
373+
) -> Optional[FreshnessThreshold]:
374+
source: UnparsedSourceDefinition = target.source
375+
376+
source_freshness = source.freshness
377+
378+
source_config_freshness_raw: Optional[Dict] = source.config.get(
379+
"freshness", {}
380+
) # Will only be None if the user explicitly set it to null
381+
source_config_freshness: Optional[FreshnessThreshold] = (
382+
FreshnessThreshold.from_dict(source_config_freshness_raw)
383+
if source_config_freshness_raw is not None
384+
else None
385+
)
386+
387+
table: UnparsedSourceTableDefinition = target.table
388+
table_freshness = table.freshness
389+
390+
table_config_freshness_raw: Optional[Dict] = table.config.get(
391+
"freshness", {}
392+
) # Will only be None if the user explicitly set it to null
393+
table_config_freshness: Optional[FreshnessThreshold] = (
394+
FreshnessThreshold.from_dict(table_config_freshness_raw)
395+
if table_config_freshness_raw is not None
396+
else None
397+
)
398+
399+
return merge_freshness(
400+
source_freshness,
401+
source_config_freshness,
402+
table_freshness,
403+
table_config_freshness,
404+
)
405+
380406

381407
def merge_freshness_time_thresholds(
382408
base: Optional[Time], update: Optional[Time]
@@ -414,7 +440,7 @@ def merge_freshness(*thresholds: Optional[FreshnessThreshold]) -> Optional[Fresh
414440
merged_freshness_obj.error_after = merged_error_after
415441
merged_freshness_obj.warn_after = merged_warn_after
416442
current_merged_value = merged_freshness_obj
417-
elif base is None and update is not None:
443+
elif base is None and bool(update):
418444
# If current_merged_value (base) is None, the update becomes the new value
419445
current_merged_value = update
420446
else: # This covers cases where 'update' is None, or both 'base' and 'update' are None.

tests/functional/sources/fixtures.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
loader: custom
2222
freshness:
2323
warn_after: {count: 18, period: hour}
24+
error_after: {count: 24, period: hour}
2425
config:
2526
freshness: # default freshness, takes precedence over top-level key above
2627
warn_after: {count: 12, period: hour}
@@ -480,3 +481,33 @@
480481
- name: test_table
481482
identifier: source
482483
"""
484+
485+
freshness_with_explicit_null_in_table_schema_yml = """version: 2
486+
sources:
487+
- name: test_source
488+
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
489+
freshness:
490+
warn_after:
491+
count: 24
492+
period: hour
493+
quoting:
494+
identifier: True
495+
tables:
496+
- name: source_a
497+
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
498+
config:
499+
freshness: null
500+
"""
501+
502+
freshness_with_explicit_null_in_source_schema_yml = """version: 2
503+
sources:
504+
- name: test_source
505+
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
506+
config:
507+
freshness: null
508+
quoting:
509+
identifier: True
510+
tables:
511+
- name: source_a
512+
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
513+
"""

tests/functional/sources/test_source_freshness.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
error_models_schema_yml,
1919
filtered_models_schema_yml,
2020
freshness_via_metadata_schema_yml,
21+
freshness_with_explicit_null_in_source_schema_yml,
22+
freshness_with_explicit_null_in_table_schema_yml,
2123
override_freshness_models_schema_yml,
2224
)
2325

@@ -582,3 +584,23 @@ def test_hooks_do_not_run_for_source_freshness(
582584
)
583585
# default behaviour - no hooks run in source freshness
584586
self._assert_project_hooks_not_called(log_output)
587+
588+
589+
class TestSourceFreshnessExplicitNullInTable(SuccessfulSourceFreshnessTest):
590+
@pytest.fixture(scope="class")
591+
def models(self):
592+
return {"schema.yml": freshness_with_explicit_null_in_table_schema_yml}
593+
594+
def test_source_freshness_explicit_null_in_table(self, project):
595+
result = self.run_dbt_with_vars(project, ["source", "freshness"], expect_pass=True)
596+
assert {r.node.name: r.status for r in result} == {}
597+
598+
599+
class TestSourceFreshnessExplicitNullInSource(SuccessfulSourceFreshnessTest):
600+
@pytest.fixture(scope="class")
601+
def models(self):
602+
return {"schema.yml": freshness_with_explicit_null_in_source_schema_yml}
603+
604+
def test_source_freshness_explicit_null_in_source(self, project):
605+
result = self.run_dbt_with_vars(project, ["source", "freshness"], expect_pass=True)
606+
assert {r.node.name: r.status for r in result} == {}

tests/unit/parser/test_sources.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from typing import List, Optional
2+
3+
import pytest
4+
5+
from core.dbt.artifacts.resources.v1.components import FreshnessThreshold, Time
6+
from core.dbt.parser.sources import merge_freshness
7+
8+
9+
class TestMergeSourceFreshness:
10+
@pytest.mark.parametrize(
11+
"thresholds,expected_result",
12+
[
13+
([None, None], None),
14+
(
15+
[
16+
FreshnessThreshold(
17+
warn_after=Time(count=1, period="hour"),
18+
error_after=Time(count=1, period="day"),
19+
),
20+
None,
21+
],
22+
None,
23+
),
24+
(
25+
[
26+
FreshnessThreshold(
27+
warn_after=Time(count=1, period="hour"),
28+
error_after=Time(count=1, period="day"),
29+
),
30+
None,
31+
FreshnessThreshold(),
32+
],
33+
None,
34+
),
35+
(
36+
[
37+
FreshnessThreshold(warn_after=Time(count=1, period="hour")),
38+
FreshnessThreshold(error_after=Time(count=1, period="day")),
39+
],
40+
FreshnessThreshold(
41+
warn_after=Time(count=1, period="hour"),
42+
error_after=Time(count=1, period="day"),
43+
),
44+
),
45+
(
46+
[
47+
None,
48+
FreshnessThreshold(warn_after=Time(count=1, period="hour")),
49+
FreshnessThreshold(error_after=Time(count=1, period="day")),
50+
],
51+
FreshnessThreshold(
52+
warn_after=Time(count=1, period="hour"),
53+
error_after=Time(count=1, period="day"),
54+
),
55+
),
56+
(
57+
[
58+
FreshnessThreshold(
59+
warn_after=Time(count=1, period="hour"),
60+
error_after=Time(count=1, period="day"),
61+
),
62+
FreshnessThreshold(error_after=Time(count=48, period="hour")),
63+
],
64+
FreshnessThreshold(
65+
warn_after=Time(count=1, period="hour"),
66+
error_after=Time(count=48, period="hour"),
67+
),
68+
),
69+
],
70+
)
71+
def test_merge_source_freshness(
72+
self,
73+
thresholds: List[Optional[FreshnessThreshold]],
74+
expected_result: Optional[FreshnessThreshold],
75+
):
76+
result = merge_freshness(*thresholds)
77+
assert result == expected_result

0 commit comments

Comments
 (0)