Skip to content

Feature request: allow a custom next_snapshot_and_version lambda to override the built-in snapshot_format: "delta" view path #268

@rsleedbx

Description

@rsleedbx

Feature request: allow a custom next_snapshot_and_version lambda to override the built-in snapshot_format: "delta" view path

Environment

  • databricks-labs-sdp-meta version: 0.0.11 / 0.0.12
  • Databricks Runtime: 16.x
  • Unity Catalog: enabled

Summary

When source_format: "snapshot" + source_details.snapshot_format: "delta" are configured,
apply_changes_from_snapshot always reads via a DLT view (full table scan on every trigger).
There is no way to supply a custom next_snapshot_and_version lambda to control what is
returned as the snapshot — even if the caller passes one to invoke_dlt_pipeline, it is
silently ignored.

Current behaviour (before fix)

is_create_view() treats snapshot_format: "delta" as unconditional:

# dataflow_pipeline.py — before fix
def is_create_view(self):
    if self.dataflowSpec.sourceDetails.get("snapshot_format") == "delta":
        self.next_snapshot_and_version_from_source_view = True
        return True          # always create view; lambda is never reached
    elif self.next_snapshot_and_version:
        return False
    return True

And apply_changes_from_snapshot() gates on next_snapshot_and_version_from_source_view:

# dataflow_pipeline.py — before fix
source = (
    (lambda latest_snapshot_version: self.next_snapshot_and_version(...))
    if self.next_snapshot_and_version and not self.next_snapshot_and_version_from_source_view
    else self.view_name   # lambda is ignored when snapshot_format="delta" is set
)

Result: callers that configure snapshot_format: "delta" and pass a custom lambda to
invoke_dlt_pipeline always get the view-based path; the lambda is dead code.

Desired behaviour

A custom next_snapshot_and_version lambda passed to invoke_dlt_pipeline should take
full priority over the built-in view path for snapshot specs. The built-in view path
should remain the default when no lambda is supplied.

This enables callers to inject efficient, version-aware snapshot logic — for example:

  • O(1) fast skip: check the source Delta table version (one metadata read) and return
    None immediately when nothing has changed, avoiding a full table scan entirely.
  • Partial snapshots: return only rows relevant to a specific version range.
  • Column renaming: rename reserved system columns (__START_AT, __END_AT) inside
    the lambda so they never reach DLT's schema analyser, which otherwise strips or rejects them.

Fix

Two changes in dataflow_pipeline.py:

1. is_create_view() — check custom lambda first, before checking snapshot_format

# dataflow_pipeline.py — after fix
def is_create_view(self):
    # applyChangesFromSnapshot may not be set for non-snapshot specs (e.g. CDF streaming).
    _is_snapshot_spec = (
        getattr(self, "applyChangesFromSnapshot", None) is not None
        or (
            self.dataflowSpec.sourceDetails
            and self.dataflowSpec.sourceDetails.get("snapshot_format") == "delta"
        )
    )
    # Custom lambda takes priority for snapshot specs: skip view creation so that
    # apply_changes_from_snapshot() uses the lambda as the DLT source directly.
    # For non-snapshot specs (e.g. CDF streaming tables), always create the view.
    if self.next_snapshot_and_version and _is_snapshot_spec:
        return False
    if self.dataflowSpec.sourceDetails and \
            self.dataflowSpec.sourceDetails.get("snapshot_format") == "delta":
        self.next_snapshot_and_version_from_source_view = True
        return True
    return True

2. apply_changes_from_snapshot() — remove the not next_snapshot_and_version_from_source_view guard

# dataflow_pipeline.py — after fix
source = (
    (lambda latest_snapshot_version: self.next_snapshot_and_version(
        latest_snapshot_version, self.dataflowSpec
    ))
    if self.next_snapshot_and_version   # custom lambda takes priority over view
    else self.view_name
)

Behaviour matrix after fix

snapshot_format: "delta" in config custom lambda passed View created? Snapshot source
yes no yes DLT view (full scan, original behaviour)
yes yes no custom lambda
no yes no custom lambda
no no yes DLT view (original behaviour)

Why the _is_snapshot_spec guard matters

next_snapshot_and_version is a single callable passed to invoke_dlt_pipeline and then
forwarded to every dataflow spec in the pipeline — including non-snapshot specs such as
CDF streaming tables (intpk). Without the _is_snapshot_spec check, is_create_view()
would return False for all specs whenever a lambda is present, preventing the CDF view
from being registered for streaming tables and breaking them.

The guard uses getattr(self, "applyChangesFromSnapshot", None) because
self.applyChangesFromSnapshot is only set when the spec has applyChangesFromSnapshot
configured; accessing it directly on non-snapshot specs raises AttributeError.

Example: version-aware snapshot lambda (O(1) fast skip)

def dtix_next_snapshot_and_version(latest_snapshot_version, dataflowSpec):
    """
    Check the source Delta table version before doing any data scan.
    If the version has not advanced since the last pipeline run, return None
    immediately (DLT marks the run complete without touching any data).
    If the version has advanced, read the full current table and rename reserved
    columns __START_AT / __END_AT before DLT analyses the schema.
    """
    catalog = dataflowSpec.sourceDetails.get("catalog")          # "main"
    db      = dataflowSpec.sourceDetails.get("source_database")  # LFC schema
    table   = dataflowSpec.sourceDetails.get("source_table")     # "dtix"
    catalog_prefix = f"{catalog}." if catalog else ""
    full_table = f"{catalog_prefix}{db}.{table}"

    current_version = (
        spark.sql(f"DESCRIBE HISTORY {full_table} LIMIT 1").first()["version"]
    )
    if latest_snapshot_version is not None and latest_snapshot_version >= current_version:
        return None   # O(1) fast skip — no data scan

    df = spark.read.table(full_table)
    if "__START_AT" in df.columns:
        df = df.withColumnRenamed("__START_AT", "lfc_start_at")
    if "__END_AT" in df.columns:
        df = df.withColumnRenamed("__END_AT", "lfc_end_at")
    df = df.dropDuplicates()
    return (df, current_version)

DataflowPipeline.invoke_dlt_pipeline(
    spark,
    layer,
    bronze_next_snapshot_and_version=dtix_next_snapshot_and_version,
)

Note on sourceDetails key names: After the onboarding_job processes the onboarding
JSON, source_catalog_prod is stored as sourceDetails["catalog"], source_database stays
sourceDetails["source_database"], and source_table stays sourceDetails["source_table"].
Use dataflowSpec.sourceDetails.get("catalog")not "source_catalog_prod" — inside
the lambda.

Notes

  • No behaviour change when no custom lambda is supplied (snapshot_format: "delta" continues
    to use the built-in view path exactly as before).
  • No behaviour change for non-snapshot specs (CDF streaming tables always get a view).
  • The lambda receives (latest_snapshot_version, dataflowSpec) where
    latest_snapshot_version is None on the first pipeline run and the version returned by
    the previous lambda call on subsequent runs.
  • Returning None from the lambda signals DLT to stop processing snapshots for this trigger
    (run completes successfully with no data changes applied).

Related issues

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions