Skip to content

fix: async engine side-effect column propagation and collision resolution#509

Merged
andreatgretel merged 7 commits intomainfrom
andreatgretel/fix/async-side-effect-columns
Apr 13, 2026
Merged

fix: async engine side-effect column propagation and collision resolution#509
andreatgretel merged 7 commits intomainfrom
andreatgretel/fix/async-side-effect-columns

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

Summary

Fix two bugs in the async engine's handling of @custom_column_generator side-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running with DATA_DESIGNER_ASYNC_ENGINE=1.

Related Issue

Fixes #508

Changes

  • ExecutionGraph.set_side_effect() now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. Prevents false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages.
  • AsyncTaskScheduler now includes side-effect columns in _instance_to_columns so their values are written to the RowGroupBufferManager and available to downstream prompt templates.

Testing

  • make test passes (1747 engine tests)
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

…tion

ExecutionGraph.set_side_effect() now uses first-writer-wins instead of
last-writer-wins, matching sync engine semantics where earlier consumers
see the first producer's value. This prevents false DAGCircularDependencyError
when multiple generators declare the same side-effect column at different
pipeline stages.

AsyncTaskScheduler now includes side-effect columns in _instance_to_columns
so their values are written to the RowGroupBufferManager and available to
downstream prompt templates.

Fixes #508
@andreatgretel andreatgretel requested a review from a team as a code owner April 8, 2026 15:08
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

PR #509 Review: fix: async engine side-effect column propagation and collision resolution

Summary

This PR fixes two bugs in the async engine's handling of @custom_column_generator side-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running with DATA_DESIGNER_ASYNC_ENGINE=1:

  1. ExecutionGraph.set_side_effect() — changed from last-writer-wins to first-writer-wins semantics, preventing false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages.
  2. AsyncTaskScheduler.__init__ — now includes side-effect columns in _instance_to_columns so their values are written to RowGroupBufferManager and available to downstream prompt templates.

Files changed: 3 (37 additions, 3 deletions)

Findings

Correctness

execution_graph.py — first-writer-wins (lines 107-116): The change is clean and correct. The set_side_effect method now guards with if side_effect_col not in self._side_effect_map, which implements first-writer-wins. Since ExecutionGraph.create() iterates column_configs in declared order (line 56-67), and the docstring says this matches sync engine semantics, this is the right fix. The previous last-writer-wins behavior would silently re-map a side-effect to a later producer, which could create a dependency edge back to an earlier stage and trigger a false cycle detection.

async_scheduler.py — side-effect propagation (lines 130-139): The fix correctly adds side-effect columns to _instance_to_columns. The seen_cols set prevents the same side-effect column from being added to multiple generators' output lists when there's a collision (consistent with the first-writer-wins semantics in the graph). The getattr(gen.config, "side_effect_columns", []) pattern is safe — configs that don't define side_effect_columns will return [].

Observations

  1. Two-loop pattern in async_scheduler.py (lines 132-139): The code uses two separate loops over generators.items() — the first populates primary columns, the second adds side-effects. This is intentional and correct: the seen_cols set must be fully populated with all primary column names before side-effect deduplication runs, otherwise a side-effect column that shadows a primary column could slip through. Good design choice.

  2. _instance_to_columns usage is comprehensive. I checked all 6 usage sites of _instance_to_columns in async_scheduler.py (lines ~358, ~379, ~621, ~666, ~768, ~794). All sites iterate output_cols and check if col in result_df.columns or if col in result before writing. This means adding side-effect columns to the list is safe — if a generator doesn't produce a side-effect value for a given invocation, it's simply skipped.

  3. No logging for silently ignored collisions. When set_side_effect silently drops a second registration, there's no debug-level log. This is fine for correctness, but a logger.debug(...) could help users troubleshoot pipelines. Minor — not a blocker.

Testing

The new test test_side_effect_collision_first_writer_wins is well-structured: it registers two producers for the same side-effect, verifies the first wins, and checks downstream edge resolution. The existing test_side_effect_name_collision_prefers_real_column test (already passing) provides complementary coverage for the real-column-wins-over-side-effect case.

Gap: There is no unit test for the AsyncTaskScheduler side-effect propagation change. Testing this would require mocking generators with config.side_effect_columns, which is more of an integration concern. The PR description mentions 1747 engine tests pass, which likely covers this path indirectly. Low risk, but worth noting.

Style / Conventions

  • Code follows the project's absolute import convention.
  • Type annotations are present on all new variables (dict[int, list[str]], set[str]).
  • Docstring on set_side_effect clearly explains the first-writer-wins contract and its rationale.
  • No unnecessary changes or scope creep.

Verdict

LGTM. This is a focused, well-reasoned bug fix. Both changes are correct and consistent with each other. The test coverage for ExecutionGraph is solid. The only minor gap is the lack of a direct unit test for the AsyncTaskScheduler propagation change, but the overall test suite passing mitigates this risk. Ship it.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 8, 2026

Greptile Summary

This PR fixes two bugs in the async engine's handling of @custom_column_generator side-effect columns. First, AsyncTaskScheduler.__init__ now maintains two separate dicts — _gen_instance_to_columns (graph-registered columns only, used for completion tracking) and _gen_instance_to_columns_including_side_effects (extends the former with side-effect columns, used for buffer writes). This directly fixes the KeyError in CompletionTracker described in the prior thread. Second, both ExecutionGraph.set_side_effect() and topologically_sort_column_configs() now raise ConfigCompilationError instead of silently overwriting when two different producers try to claim the same side-effect column name, converting a confusing false DAGCircularDependencyError into a clear diagnostic. Tests cover all three changed code paths with good isolation.

Confidence Score: 5/5

Safe to merge — both bugs are correctly fixed, the split-dict invariant is clean, and the new collision guard eliminates the false DAGCircularDependencyError.

All findings are P2 or lower; the previous P0 KeyError concern raised in the thread is fully addressed by the _gen_instance_to_columns / _gen_instance_to_columns_including_side_effects split. Collision detection in both dag.py and execution_graph.py is consistent. Test coverage directly exercises the separation invariant and the new error paths.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Splits _instance_to_columns into two dicts: _gen_instance_to_columns (completion tracking, no side-effect columns) and _gen_instance_to_columns_including_side_effects (buffer writes, includes side-effect columns); all three run methods correctly use the new write-only dict.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py set_side_effect() now raises ConfigCompilationError on collision from a different producer (idempotent for same producer), preventing silent last-writer-wins overwrite that caused false DAGCircularDependencyError.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py Adds early duplicate-side-effect-producer detection in topologically_sort_column_configs(), consistent with execution_graph.py validation; raises ConfigCompilationError with a clear message.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py New test verifies that side-effect columns appear in the buffer-write dict but not in the completion-tracking dict, directly exercising the invariant that prevents the CompletionTracker KeyError.
packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py New test exercises the duplicate-side-effect-producer detection in topologically_sort_column_configs() using real custom column generators.
packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py New test confirms set_side_effect() raises ConfigCompilationError when two distinct producers register the same side-effect column name.

Sequence Diagram

sequenceDiagram
    participant S as AsyncTaskScheduler
    participant R as _run_from_scratch / _run_cell / _run_batch
    participant BM as RowGroupBufferManager
    participant CT as CompletionTracker

    Note over S: __init__: build two dicts
    S->>S: _gen_instance_to_columns (real columns only)
    S->>S: _gen_instance_to_columns_including_side_effects (real + side-effect columns)

    S->>R: execute task (task.column)
    R->>R: generator.agenerate(...)
    R->>BM: update_batch / update_cell using _gen_instance_to_columns_including_side_effects

    S->>CT: mark_row_range_complete / mark_cell_complete using _gen_instance_to_columns (NO side-effect columns)
Loading

Reviews (7): Last reviewed commit: "fix: reject duplicate side-effect produc..." | Re-trigger Greptile

…cheduler

Side-effect columns added to _instance_to_columns caused KeyError in
CompletionTracker._validate_strategy() because they are not registered
in the execution graph. Split into _instance_to_write_columns (buffer
writes, includes side-effects) and _instance_to_columns (completion
tracking, real columns only).
Comment on lines +110 to +116
First-writer-wins: if a side-effect column is already mapped to a
different producer, the earlier mapping is kept. This matches sync
engine semantics where earlier consumers see the first producer's
value.
"""
if side_effect_col not in self._side_effect_map:
self._side_effect_map[side_effect_col] = producer
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a side effect column 1:1 with its producer? In which scenario would multiple producers point to the same side effect column?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that in a correctly configured pipeline it should be 1:1. The collision scenario comes from Anonymizer's detection workflow - it has branching pipeline paths where different stages declare the same side-effect columns (e.g. _merged_tagged_text is declared by both prepare_validation_inputs and merge_and_build_candidates), but only one path is active per run. With last-writer-wins the graph resolves the wrong producer and creates a false cycle.

That said, this is an unusual pattern and silently ignoring the second registration isn't great. I'll add a warning log on collision so misconfigurations don't go unnoticed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this example wouldn't the side effect columns be prepare_validation_inputs _merged_tagged_text and merge_and_build_candidates_merged_tagged_text? It's still 1:1 right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this - the collision appears to be real. Side-effect column names are bare strings with no namespacing. In Anonymizer's detection workflow (custom_columns.py), both merge_and_build_candidates and prepare_validation_inputs declare the exact same side-effect columns:

# merge_and_build_candidates (line 79)
side_effect_columns=[COL_MERGED_TAGGED_TEXT, COL_VALIDATION_CANDIDATES]

# prepare_validation_inputs (line 125)
side_effect_columns=[COL_MERGED_TAGGED_TEXT, COL_VALIDATION_CANDIDATES]

Both resolve to the same literal strings ("_merged_tagged_text", "_validation_candidates"). As far as I can tell, this is intentional - the two generators are alternative producers for the same downstream consumer depending on which pipeline path runs.

I considered whether namespacing (e.g. producer_name._merged_tagged_text) could solve this, but the downstream column config wouldn't know which producer ran, so it can't pick the right prefixed name. You'd likely need a union/alias mechanism in the DAG, which feels like a lot of complexity for a pattern that already works. The @custom_column_generator API is also public, so naming changes could break external plugins.

Open to other ideas, but first-writer-wins + warning seems like the right pragmatic fix here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our offline discussion, I agree with you - multiple producers for the same side-effect column is an anti-pattern. The Anonymizer case is actually an overwrite pattern within the same pipeline (not alternative paths), which works in the sync engine by accident.

I've updated set_side_effect() to raise ConfigCompilationError on duplicate producers instead of silently picking a winner (604aa21). We'll follow up on the Anonymizer side with distinct column names per stage.

Log a warning when multiple producers register the same side-effect
column (first-writer-wins still applies). Rename _instance_to_columns
and _instance_to_write_columns per review feedback for clarity.
@andreatgretel andreatgretel added the agent-review Trigger agentic CI review label Apr 13, 2026
@andreatgretel andreatgretel added agent-review Trigger agentic CI review and removed agent-review Trigger agentic CI review labels Apr 13, 2026
@github-actions
Copy link
Copy Markdown
Contributor

PR #509 Review: fix: async engine side-effect column propagation and collision resolution

Summary

This PR fixes two bugs in the async engine's handling of @custom_column_generator side-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running with DATA_DESIGNER_ASYNC_ENGINE=1 (fixes #508):

  1. ExecutionGraph.set_side_effect() — changed from last-writer-wins to first-writer-wins semantics, with a warning log on collision. Prevents false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages.
  2. AsyncTaskScheduler — now maintains a separate _gen_instance_to_columns_including_side_effects mapping so side-effect column values are written to RowGroupBufferManager and available to downstream prompt templates, while keeping completion tracking (_gen_instance_to_columns) free of unregistered columns.

Files changed: 4 (+117/−19)

Findings

Correctness

execution_graph.py — first-writer-wins (set_side_effect, lines 110-126): The change is clean and correct. The method now guards with if side_effect_col not in self._side_effect_map, implementing first-writer-wins. Since ExecutionGraph.create() iterates column_configs in declared order, and the docstring states this matches sync engine semantics (where earlier consumers see the first producer's value on the shared mutable row dict), this is the right fix. The previous last-writer-wins behavior silently re-mapped a side-effect to a later producer, creating incorrect dependency edges and false cycle detection. The added logger.warning on collision is a good addition for debuggability.

async_scheduler.py — dual-map approach (lines 134-151): The separation of _gen_instance_to_columns (completion tracking) from _gen_instance_to_columns_including_side_effects (buffer writes) is sound. Side-effect columns are not registered in the ExecutionGraph and would cause KeyError in CompletionTracker if included in the completion map. The seen_cols set prevents the same side-effect column from being added to multiple generators' output lists (consistent with first-writer-wins in the graph). The getattr(gen.config, "side_effect_columns", []) pattern safely handles configs without that attribute.

Two-loop pattern (lines 139-150): The code correctly uses two separate loops over generators.items() — first populates primary columns, then adds side-effects. This ensures seen_cols is fully populated with all primary column names before side-effect deduplication, preventing a side-effect from shadowing a real column from a different generator. Intentional and correct.

Usage-site audit — _gen_instance_to_columns vs _gen_instance_to_columns_including_side_effects:

  • _gen_instance_to_columns is used in _salvage_rounds (lines 369, 390), _dispatch_seeds (line 633), and _execute_task_inner_impl (line 678) — all for dispatch/completion logic. Correct: side-effect columns should not participate in task scheduling.
  • _gen_instance_to_columns_including_side_effects is used in _run_from_scratch (line 783), _run_cell (line 809), and _run_batch (line 833) — all for buffer writes. Correct: these are the points where result data is written back, and side-effect values must be included. All sites guard with if col in result_df.columns or if col in result, so missing side-effect values are safely skipped.

Minor Observations

  1. [Low] Verbose set comprehension (async_scheduler.py:144): {col for col in generators} is equivalent to set(generators). Purely stylistic, not a bug.

  2. [Low] Long line (async_scheduler.py:145): The dict comprehension line is ~107 characters. If this exceeds the project's ruff line-length config, it would need a break. If CI passes, it's fine.

  3. [Info] Silent skip on side-effect/real-column overlap (async_scheduler.py:148): If a side-effect column name matches a real column from a different generator, the if side_effect_col not in seen_cols check silently skips it. This is correct (real columns take precedence, matching resolve_side_effect logic), but there's no warning logged for this case, unlike the graph collision. A debug log here could aid troubleshooting complex pipelines. Not a blocker.

Testing

test_side_effect_collision_first_writer_wins (test_execution_graph.py) — Well-structured: registers two producers for the same side-effect, verifies the first wins, checks the warning log output, and validates downstream dependency resolution via resolve_side_effect + add_edge. Good complementary coverage alongside existing test_side_effect_name_collision_prefers_real_column.

test_side_effect_columns_separated_from_completion_tracking (test_async_scheduler.py) — Directly tests the dual-map invariant: side-effect columns present in _gen_instance_to_columns_including_side_effects but absent from _gen_instance_to_columns. Uses object.__setattr__ to inject a mock config with side_effect_columns, which is a pragmatic approach for testing init logic. Good coverage of the key invariant.

Gap: No E2E test exercising the full async pipeline with side-effect columns end-to-end (acknowledged in PR description). Given the two unit tests cover the specific root causes and the full test suite passes (1747 engine tests), this is acceptable for merge. A follow-up E2E test would strengthen confidence for the Anonymizer use case.

Style / Conventions

  • Absolute imports throughout, consistent with project rules.
  • Type annotations on all new variables (dict[int, list[str]], set[str]).
  • from __future__ import annotations present in modified files.
  • Docstring on set_side_effect clearly explains the first-writer-wins contract and rationale.
  • Inline comments in __init__ explain the purpose of each map. Verbose attribute name (_gen_instance_to_columns_including_side_effects) is long but self-documenting.
  • No scope creep — changes are tightly focused on the two bugs.

Verdict

LGTM. This is a focused, well-reasoned bug fix. Both changes are correct and mutually consistent. The dual-map design in the scheduler cleanly separates completion tracking from buffer writes without disrupting existing async semantics. The first-writer-wins policy in the graph faithfully reproduces sync engine behavior. Test coverage is solid for both fixes. Minor style nits (verbose set comprehension, long line) are non-blocking. Ship it.

@github-actions github-actions bot removed the agent-review Trigger agentic CI review label Apr 13, 2026
andreatgretel and others added 3 commits April 13, 2026 16:50
Replace first-writer-wins collision handling with a hard error.
Each side-effect column must have exactly one producer; duplicates
are a configuration issue to be fixed at the source.
Mirror the async path check: raise ConfigCompilationError when two
custom columns declare the same side-effect column name during
topological sort.
@andreatgretel andreatgretel requested a review from nabinchha April 13, 2026 21:25
@andreatgretel andreatgretel merged commit 533a94b into main Apr 13, 2026
47 checks passed
@andreatgretel andreatgretel deleted the andreatgretel/fix/async-side-effect-columns branch April 14, 2026 11:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: async engine drops side-effect column values and silently misresolves collisions

2 participants