Skip to content

Commit 5887ac7

Browse files
committed
fix(builder): address agent review findings for resume feature
H1: Annotate _load_resume_state() call in _build_async with a comment clarifying that the return value is intentionally discarded β€” the async path derives ground-truth state from the filesystem, not from metadata. M1: Replace fragile split("_", 1)[1] filename parsing in _find_completed_row_group_ids with re.fullmatch(r"batch_(\d+)", stem), making it immune to unexpected filename shapes. L2: Remove unused _ResumeState.buffer_size field β€” the field was set but never read; _build_with_resume uses the buffer_size parameter directly. L4: Move mid-file imports (json, Path, ArtifactStorage) used by resume tests to the top of test_dataset_builder.py and drop the underscore aliases. L1: Update plans/525/resume-interrupted-runs.md to reflect that async engine resume is fully implemented (not deferred) and that missing metadata triggers a fresh restart instead of raising DatasetGenerationError.
1 parent c496e7e commit 5887ac7

3 files changed

Lines changed: 16 additions & 19 deletions

File tree

β€Žpackages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.pyβ€Ž

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import functools
88
import logging
99
import os
10+
import re
1011
import time
1112
import uuid
1213
from dataclasses import dataclass
@@ -88,7 +89,6 @@
8889
class _ResumeState:
8990
num_completed_batches: int
9091
actual_num_records: int
91-
buffer_size: int
9292

9393

9494
class DatasetBuilder:
@@ -255,7 +255,6 @@ def _load_resume_state(self, num_records: int, buffer_size: int) -> _ResumeState
255255
return _ResumeState(
256256
num_completed_batches=metadata["num_completed_batches"],
257257
actual_num_records=metadata["actual_num_records"],
258-
buffer_size=buffer_size,
259258
)
260259

261260
def _build_with_resume(
@@ -377,10 +376,9 @@ def _find_completed_row_group_ids(self) -> set[int]:
377376
return set()
378377
ids: set[int] = set()
379378
for p in final_path.glob("batch_*.parquet"):
380-
try:
381-
ids.add(int(p.stem.split("_", 1)[1]))
382-
except (ValueError, IndexError):
383-
continue
379+
m = re.fullmatch(r"batch_(\d+)", p.stem)
380+
if m:
381+
ids.add(int(m.group(1)))
384382
return ids
385383

386384
def _build_async(
@@ -408,6 +406,8 @@ def _build_async(
408406
initial_total_num_batches = 0
409407

410408
if resume:
409+
# Validate run-parameter compatibility only β€” the async path derives
410+
# ground-truth state from the filesystem, not from the returned state object.
411411
self._load_resume_state(num_records, buffer_size)
412412
completed_ids = self._find_completed_row_group_ids()
413413
skip_row_groups = frozenset(completed_ids)

β€Žpackages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.pyβ€Ž

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
from __future__ import annotations
55

6+
import json
67
import logging
8+
from pathlib import Path
79
from typing import TYPE_CHECKING
810
from unittest.mock import Mock, patch
911

@@ -32,6 +34,7 @@
3234
from data_designer.engine.processing.processors.base import Processor
3335
from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry
3436
from data_designer.engine.resources.seed_reader import DataFrameSeedReader
37+
from data_designer.engine.storage.artifact_storage import ArtifactStorage
3538

3639
if TYPE_CHECKING:
3740
import pandas as pd
@@ -944,22 +947,16 @@ def test_allow_resize_multiple_batches(
944947
# ---------------------------------------------------------------------------
945948

946949

947-
import json as _json
948-
from pathlib import Path as _Path
949-
950-
from data_designer.engine.storage.artifact_storage import ArtifactStorage as _ArtifactStorage
951-
952-
953-
def _write_metadata(dataset_dir: _Path, **fields) -> None:
950+
def _write_metadata(dataset_dir: Path, **fields) -> None:
954951
"""Write a metadata.json into an existing dataset folder."""
955952
dataset_dir.mkdir(parents=True, exist_ok=True)
956953
(dataset_dir / "sentinel.txt").write_text("x") # make folder non-empty for resolved_dataset_name
957-
(dataset_dir / "metadata.json").write_text(_json.dumps(fields))
954+
(dataset_dir / "metadata.json").write_text(json.dumps(fields))
958955

959956

960957
def _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, *, buffer_size: int = 2):
961958
"""Return a DatasetBuilder whose ArtifactStorage has resume=True."""
962-
storage = _ArtifactStorage(artifact_path=tmp_path, resume=True)
959+
storage = ArtifactStorage(artifact_path=tmp_path, resume=True)
963960
stub_resource_provider.artifact_storage = storage
964961
stub_resource_provider.run_config = RunConfig(buffer_size=buffer_size)
965962
return DatasetBuilder(
@@ -1113,7 +1110,7 @@ def test_find_completed_row_group_ids_ignores_non_batch_files(
11131110
# ---------------------------------------------------------------------------
11141111

11151112

1116-
def _write_parquet_files(parquet_dir: _Path, row_group_ids: list[int]) -> None:
1113+
def _write_parquet_files(parquet_dir: Path, row_group_ids: list[int]) -> None:
11171114
"""Create stub batch_*.parquet files for the given row group IDs."""
11181115
parquet_dir.mkdir(parents=True, exist_ok=True)
11191116
for rg_id in row_group_ids:

β€Žplans/525/resume-interrupted-runs.mdβ€Ž

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ results = dd.create(config_builder, num_records=10_000, resume=True)
3838
| Resume state source | Read `metadata.json` written after each completed batch | Already contains `num_completed_batches`, `target_num_records`, `buffer_size`, `actual_num_records`. No new persistence needed. |
3939
| Partial batch at crash time | Clear `tmp-partial-parquet-files/` at resume start | Simpler and safer than merging an incomplete parquet; losing one batch is acceptable since the user is already recovering from a crash. |
4040
| Compatibility validation | Raise `DatasetGenerationError` if `num_records` or `buffer_size` changed | Different `num_records` changes which rows land in which batch file, breaking the numbering invariant. `buffer_size` changes the file-per-batch mapping. Both must match. |
41-
| Async engine | Raise `DatasetGenerationError` if `DATA_DESIGNER_ASYNC_ENGINE=1` with `resume=True` | The async path uses a row-group scheduler rather than an indexed batch loop; resume would require a different strategy. Out of scope for v1. |
41+
| Async engine | Supported β€” derives ground-truth state from the filesystem via `_find_completed_row_group_ids()` | The async path scans completed `batch_*.parquet` files rather than reading `metadata.json`, avoiding the metadata-lag crash window. Both `initial_actual_num_records` and `initial_total_num_batches` are sourced from the filesystem. Incremental `write_metadata` calls after each row group enable resumability. |
4242
| Already-complete runs | Detect and warn, return existing path | If `num_completed_batches == total_num_batches` the dataset is already complete; the user may have re-run by mistake. |
43-
| No metadata β†’ error | Raise `DatasetGenerationError` | Resuming without a checkpoint is impossible; a clear error is better than silent fallback to a fresh run. |
43+
| No metadata β†’ restart fresh | Log info message and restart from batch 0 | If `metadata.json` is missing the run was interrupted before any batch completed. Restarting silently is safer UX than forcing the user to remove `resume=True`. |
4444

4545
## Affected Files
4646

@@ -168,7 +168,7 @@ def build(self, *, num_records, on_batch_complete=None, save_multimedia_to_disk=
168168
## Trade-offs Considered
169169

170170
- **Automatic resume detection** (no flag, detect existing folder automatically): rejected β€” removes user intent. A user re-running a pipeline from scratch would be surprised by silent resumption.
171-
- **Resume support for async engine**: deferred to a follow-up. The async scheduler's row-group model doesn't map 1:1 to batch indices; implementing it would require a separate mechanism.
171+
- **Resume support for async engine**: implemented (diverges from original plan). The async path scans the filesystem for completed row groups instead of relying on potentially-stale `metadata.json`, handling the crash window between `move_partial_result_to_final_file_path` and `write_metadata`.
172172
- **Per-column resume** (resume from column N within an interrupted batch): out of scope. Requires per-column checkpointing and state reconstruction, significantly higher complexity.
173173

174174
## Delivery

0 commit comments

Comments
Β (0)