Skip to content

Commit 17e9288

Browse files
committed
fix(engine): restore skip.columns edges in topologically_sort_column_configs
The sync builder executes generators in compile-time sort order (from _column_configs, populated via this function), not ExecutionGraph order. Dropping skip.columns edges caused evaluate_skip_when to hit UndefinedError when the referenced column hadn't been generated yet, silently skipping rows. Also: - Refactor edge building into _add_edge() helper with a label parameter to distinguish "required" from "skip.when" edges in debug output - Rename test_dag.py -> test_topological_sort.py to match the new module location - Add from __future__ import annotations (required by AGENTS.md) - Add test_side_effect_column_ordering covering the side_effect_map.get() path - Add test_skip_when_column_ordering covering the skip.columns edge path
1 parent 6b2a8c8 commit 17e9288

2 files changed

Lines changed: 50 additions & 10 deletions

File tree

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -363,19 +363,21 @@ def resolve(col_name: str) -> str | None:
363363
upstream: dict[str, set[str]] = {name: set() for name in dag_col_dict}
364364
downstream: dict[str, set[str]] = {name: set() for name in dag_col_dict}
365365

366-
# Only required_columns edges are added here. skip.columns edges are intentionally
367-
# omitted: ExecutionGraph.create handles them in its own two-pass build, which is
368-
# the authoritative execution-order graph. This function only needs to produce a
369-
# valid topological ordering of ColumnConfigT objects for config compilation.
366+
def _add_edge(name: str, dep: str, label: str) -> None:
367+
resolved = resolve(dep)
368+
if resolved is None:
369+
return
370+
logger.debug(f"{LOG_INDENT}🔗 `{name}` depends on `{resolved}` [{label}]")
371+
upstream[name].add(resolved)
372+
downstream[resolved].add(name)
373+
370374
logger.info("⛓️ Sorting column configs into a Directed Acyclic Graph")
371375
for name, col in dag_col_dict.items():
372376
for req in col.required_columns:
373-
resolved = resolve(req)
374-
if resolved is None or resolved == name:
375-
continue
376-
logger.debug(f"{LOG_INDENT}🔗 `{name}` depends on `{resolved}`")
377-
upstream[name].add(resolved)
378-
downstream[resolved].add(name)
377+
_add_edge(name, req, "required")
378+
if col.skip is not None:
379+
for skip_col in col.skip.columns:
380+
_add_edge(name, skip_col, "skip.when")
379381

380382
in_degree = {name: len(ups) for name, ups in upstream.items()}
381383
queue: deque[str] = deque(name for name, deg in in_degree.items() if deg == 0)

packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py renamed to packages/data-designer-engine/tests/engine/dataset_builders/utils/test_topological_sort.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4+
from __future__ import annotations
5+
46
from typing import Any
57

68
import pytest
@@ -135,3 +137,39 @@ def gen_b(row: dict[str, Any]) -> dict[str, Any]:
135137
]
136138
with pytest.raises(ConfigCompilationError, match="already produced by"):
137139
topologically_sort_column_configs(column_configs)
140+
141+
142+
def test_side_effect_column_ordering() -> None:
143+
"""A column that depends on a side-effect column is sorted after its producer."""
144+
145+
@custom_column_generator(required_columns=["seed"], side_effect_columns=["seed_trace"])
146+
def gen_with_trace(row: dict[str, Any]) -> dict[str, Any]:
147+
return row
148+
149+
column_configs = [
150+
LLMTextColumnConfig(name="seed", prompt="generate seed", model_alias=MODEL_ALIAS),
151+
ExpressionColumnConfig(name="consumer", expr="{{ seed_trace }}"),
152+
CustomColumnConfig(name="producer", generator_function=gen_with_trace),
153+
]
154+
sorted_configs = topologically_sort_column_configs(column_configs)
155+
names = [c.name for c in sorted_configs]
156+
assert names.index("producer") < names.index("consumer")
157+
158+
159+
def test_skip_when_column_ordering() -> None:
160+
"""A column with skip.when referencing another DAG column is sorted after that column."""
161+
from data_designer.config.base import SkipConfig
162+
163+
column_configs = [
164+
LLMTextColumnConfig(name="seed", prompt="generate seed", model_alias=MODEL_ALIAS),
165+
LLMTextColumnConfig(
166+
name="gated",
167+
prompt="generate gated",
168+
model_alias=MODEL_ALIAS,
169+
skip=SkipConfig(when="{{ seed == 'bad' }}"),
170+
),
171+
]
172+
# gated has no required_columns referencing seed, only a skip.when dependency
173+
sorted_configs = topologically_sort_column_configs(column_configs)
174+
names = [c.name for c in sorted_configs]
175+
assert names.index("seed") < names.index("gated")

0 commit comments

Comments
 (0)