-
Notifications
You must be signed in to change notification settings - Fork 108
Description
Feature request: allow a custom next_snapshot_and_version lambda to override the built-in snapshot_format: "delta" view path
Environment
databricks-labs-sdp-metaversion:0.0.11/0.0.12- Databricks Runtime: 16.x
- Unity Catalog: enabled
Summary
When source_format: "snapshot" + source_details.snapshot_format: "delta" are configured,
apply_changes_from_snapshot always reads via a DLT view (full table scan on every trigger).
There is no way to supply a custom next_snapshot_and_version lambda to control what is
returned as the snapshot — even if the caller passes one to invoke_dlt_pipeline, it is
silently ignored.
Current behaviour (before fix)
is_create_view() treats snapshot_format: "delta" as unconditional:
# dataflow_pipeline.py — before fix
def is_create_view(self):
if self.dataflowSpec.sourceDetails.get("snapshot_format") == "delta":
self.next_snapshot_and_version_from_source_view = True
return True # always create view; lambda is never reached
elif self.next_snapshot_and_version:
return False
return TrueAnd apply_changes_from_snapshot() gates on next_snapshot_and_version_from_source_view:
# dataflow_pipeline.py — before fix
source = (
(lambda latest_snapshot_version: self.next_snapshot_and_version(...))
if self.next_snapshot_and_version and not self.next_snapshot_and_version_from_source_view
else self.view_name # lambda is ignored when snapshot_format="delta" is set
)Result: callers that configure snapshot_format: "delta" and pass a custom lambda to
invoke_dlt_pipeline always get the view-based path; the lambda is dead code.
Desired behaviour
A custom next_snapshot_and_version lambda passed to invoke_dlt_pipeline should take
full priority over the built-in view path for snapshot specs. The built-in view path
should remain the default when no lambda is supplied.
This enables callers to inject efficient, version-aware snapshot logic — for example:
- O(1) fast skip: check the source Delta table version (one metadata read) and return
Noneimmediately when nothing has changed, avoiding a full table scan entirely. - Partial snapshots: return only rows relevant to a specific version range.
- Column renaming: rename reserved system columns (
__START_AT,__END_AT) inside
the lambda so they never reach DLT's schema analyser, which otherwise strips or rejects them.
Fix
Two changes in dataflow_pipeline.py:
1. is_create_view() — check custom lambda first, before checking snapshot_format
# dataflow_pipeline.py — after fix
def is_create_view(self):
# applyChangesFromSnapshot may not be set for non-snapshot specs (e.g. CDF streaming).
_is_snapshot_spec = (
getattr(self, "applyChangesFromSnapshot", None) is not None
or (
self.dataflowSpec.sourceDetails
and self.dataflowSpec.sourceDetails.get("snapshot_format") == "delta"
)
)
# Custom lambda takes priority for snapshot specs: skip view creation so that
# apply_changes_from_snapshot() uses the lambda as the DLT source directly.
# For non-snapshot specs (e.g. CDF streaming tables), always create the view.
if self.next_snapshot_and_version and _is_snapshot_spec:
return False
if self.dataflowSpec.sourceDetails and \
self.dataflowSpec.sourceDetails.get("snapshot_format") == "delta":
self.next_snapshot_and_version_from_source_view = True
return True
return True2. apply_changes_from_snapshot() — remove the not next_snapshot_and_version_from_source_view guard
# dataflow_pipeline.py — after fix
source = (
(lambda latest_snapshot_version: self.next_snapshot_and_version(
latest_snapshot_version, self.dataflowSpec
))
if self.next_snapshot_and_version # custom lambda takes priority over view
else self.view_name
)Behaviour matrix after fix
snapshot_format: "delta" in config |
custom lambda passed | View created? | Snapshot source |
|---|---|---|---|
| yes | no | yes | DLT view (full scan, original behaviour) |
| yes | yes | no | custom lambda |
| no | yes | no | custom lambda |
| no | no | yes | DLT view (original behaviour) |
Why the _is_snapshot_spec guard matters
next_snapshot_and_version is a single callable passed to invoke_dlt_pipeline and then
forwarded to every dataflow spec in the pipeline — including non-snapshot specs such as
CDF streaming tables (intpk). Without the _is_snapshot_spec check, is_create_view()
would return False for all specs whenever a lambda is present, preventing the CDF view
from being registered for streaming tables and breaking them.
The guard uses getattr(self, "applyChangesFromSnapshot", None) because
self.applyChangesFromSnapshot is only set when the spec has applyChangesFromSnapshot
configured; accessing it directly on non-snapshot specs raises AttributeError.
Example: version-aware snapshot lambda (O(1) fast skip)
def dtix_next_snapshot_and_version(latest_snapshot_version, dataflowSpec):
"""
Check the source Delta table version before doing any data scan.
If the version has not advanced since the last pipeline run, return None
immediately (DLT marks the run complete without touching any data).
If the version has advanced, read the full current table and rename reserved
columns __START_AT / __END_AT before DLT analyses the schema.
"""
catalog = dataflowSpec.sourceDetails.get("catalog") # "main"
db = dataflowSpec.sourceDetails.get("source_database") # LFC schema
table = dataflowSpec.sourceDetails.get("source_table") # "dtix"
catalog_prefix = f"{catalog}." if catalog else ""
full_table = f"{catalog_prefix}{db}.{table}"
current_version = (
spark.sql(f"DESCRIBE HISTORY {full_table} LIMIT 1").first()["version"]
)
if latest_snapshot_version is not None and latest_snapshot_version >= current_version:
return None # O(1) fast skip — no data scan
df = spark.read.table(full_table)
if "__START_AT" in df.columns:
df = df.withColumnRenamed("__START_AT", "lfc_start_at")
if "__END_AT" in df.columns:
df = df.withColumnRenamed("__END_AT", "lfc_end_at")
df = df.dropDuplicates()
return (df, current_version)
DataflowPipeline.invoke_dlt_pipeline(
spark,
layer,
bronze_next_snapshot_and_version=dtix_next_snapshot_and_version,
)Note on
sourceDetailskey names: After theonboarding_jobprocesses the onboarding
JSON,source_catalog_prodis stored assourceDetails["catalog"],source_databasestays
sourceDetails["source_database"], andsource_tablestayssourceDetails["source_table"].
UsedataflowSpec.sourceDetails.get("catalog")— not"source_catalog_prod"— inside
the lambda.
Notes
- No behaviour change when no custom lambda is supplied (
snapshot_format: "delta"continues
to use the built-in view path exactly as before). - No behaviour change for non-snapshot specs (CDF streaming tables always get a view).
- The lambda receives
(latest_snapshot_version, dataflowSpec)where
latest_snapshot_versionisNoneon the first pipeline run and the version returned by
the previous lambda call on subsequent runs. - Returning
Nonefrom the lambda signals DLT to stop processing snapshots for this trigger
(run completes successfully with no data changes applied).
Related issues
- Bug Bug:
apply_changes_from_snapshotraises "Snapshot reader function not provided!" whensnapshot_format: "delta"is configured #266 —apply_changes_from_snapshotraises"Snapshot reader function not provided!"
whensnapshot_format: "delta"is configured without a custom lambda.
Bug:apply_changes_from_snapshotraises "Snapshot reader function not provided!" whensnapshot_format: "delta"is configured #266