Skip to content

feat: resume interrupted dataset generation runs (sync + async engine)#526

Open
przemekboruta wants to merge 10 commits intoNVIDIA-NeMo:mainfrom
przemekboruta:main
Open

feat: resume interrupted dataset generation runs (sync + async engine)#526
przemekboruta wants to merge 10 commits intoNVIDIA-NeMo:mainfrom
przemekboruta:main

Conversation

@przemekboruta
Copy link
Copy Markdown
Contributor

@przemekboruta przemekboruta commented Apr 13, 2026

Summary

Closes #525

Adds resume: bool = False to DataDesigner.create() and DatasetBuilder.build(). When resume=True, generation picks up from where the interrupted run left off — for both the sync and async engines.

dd = DataDesigner(...)
dd.add_column(...)

# First run — interrupted mid-way
results = dd.create(config_builder, num_records=10_000)

# After restart — picks up from the last completed batch/row-group
results = dd.create(config_builder, num_records=10_000, resume=True)

Changes

Layer Change
ArtifactStorage resume: bool = False field; resolved_dataset_name skips timestamp logic on resume; new clear_partial_results()
DatasetBatchManager.start() New start_batch and initial_actual_num_records params (default 0, no breakage)
DatasetBuilder.build() New resume param; _load_resume_state() reads and validates metadata.json; _build_with_resume() skips completed batches (sync); _build_async() skips completed row groups (async)
RowGroupBufferManager.__init__() New initial_actual_num_records and initial_total_num_batches params to seed counters on resume
DatasetBuilder._find_completed_row_group_ids() New helper — scans parquet-files/ for batch_*.parquet to determine which async row groups are already done
finalize_row_group closure Now writes incremental metadata.json after every row-group checkpoint (not just at the end), making all async runs resumable if interrupted
DataDesigner.create() Exposes resume, passes it through to ArtifactStorage and builder.build()

Validation and error cases

  • Missing metadata.jsonDatasetGenerationError (interrupted before any batch completed)
  • num_records mismatch → DatasetGenerationError
  • buffer_size mismatch → DatasetGenerationError
  • Dataset already complete → warning logged, returns existing path (both engines)

Test plan

  • test_resolved_dataset_name_resume_uses_existing_folder
  • test_resolved_dataset_name_resume_raises_when_no_existing_folder
  • test_resolved_dataset_name_resume_raises_when_folder_is_empty
  • test_clear_partial_results_removes_partial_folder
  • test_clear_partial_results_is_noop_when_no_partial_folder
  • test_start_with_start_batch
  • test_start_with_initial_actual_num_records
  • test_start_with_start_batch_and_initial_actual_num_records
  • test_start_default_values_unchanged
  • test_build_resume_raises_without_metadata
  • test_build_resume_raises_on_num_records_mismatch
  • test_build_resume_raises_on_buffer_size_mismatch
  • test_build_resume_logs_warning_when_already_complete
  • test_find_completed_row_group_ids_empty_dir
  • test_find_completed_row_group_ids_with_files
  • test_find_completed_row_group_ids_ignores_non_batch_files
  • test_build_async_resume_logs_warning_when_already_complete
  • test_build_async_resume_raises_without_metadata
  • test_initial_actual_num_records
  • test_initial_total_num_batches_reflected_in_metadata

- ArtifactStorage gains a `resume: bool = False` field
- resolved_dataset_name skips timestamp logic when resume=True,
  returning the existing dataset folder name as-is
- Raises ArtifactStorageError on resume=True when the target folder
  is absent or empty (no data to resume from)
- New clear_partial_results() removes in-flight partial results
  left over from an interrupted run

Fixes NVIDIA-NeMo#525
DatasetBatchManager.start() now accepts:
- start_batch: int = 0  — first batch index to process
- initial_actual_num_records: int = 0  — records already on disk

Both default to 0 so all existing call sites are unaffected.

Fixes NVIDIA-NeMo#525
- build() gains a resume: bool = False parameter
- _load_resume_state() reads metadata.json and validates that
  num_records and buffer_size match the original run
- _build_with_resume() skips completed batches, clears in-flight
  partial results, and continues from the first incomplete batch
- Raises DatasetGenerationError with clear messages for:
  - missing metadata.json (interrupted before first batch completes)
  - num_records mismatch
  - buffer_size mismatch
  - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported)
- Logs a warning and returns early when dataset is already complete

Fixes NVIDIA-NeMo#525
- create() gains resume: bool = False
- _create_resource_provider() passes resume to ArtifactStorage
- builder.build() receives the resume flag

Fixes NVIDIA-NeMo#525
Covers:
- ArtifactStorage.resolved_dataset_name with resume=True
- ArtifactStorage.clear_partial_results()
- DatasetBatchManager.start() with start_batch and
  initial_actual_num_records
- DatasetBuilder.build(resume=True): missing metadata, num_records
  mismatch, buffer_size mismatch, already-complete detection

Fixes NVIDIA-NeMo#525
@przemekboruta przemekboruta requested a review from a team as a code owner April 13, 2026 11:15
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 13, 2026

Greptile Summary

Adds resume: bool = False to DataDesigner.create() and DatasetBuilder.build(), enabling both the sync and async engines to skip already-completed batches/row-groups on restart. The implementation is thorough: ArtifactStorage gains resume-aware path resolution and clear_partial_results(), the sync engine seeds its batch counter from metadata.json (written incrementally after every finish_batch()), and the async engine now writes incremental metadata after each finalize_row_group while using filesystem file-count as the authoritative source of truth for crash-window safety.

  • The public DataDesigner.create() docstring says resume is "Not supported when DATA_DESIGNER_ASYNC_ENGINE is enabled," but the code fully supports it — this will mislead users reading the API docs.
  • In the async crash window (row-group N written to disk, then write_metadata crashes), initial_total_num_batches is correctly taken from filesystem count, but initial_actual_num_records still reads the stale metadata value, leaving actual_num_records under-counted in the final metadata.json by one row group.

Confidence Score: 4/5

Safe to merge after fixing the misleading async docstring; the metadata undercount in the crash window is a minor accuracy issue with no data loss.

One P1 finding: the public API docstring explicitly tells users async resume is unsupported when it works correctly, which could suppress legitimate use. The P2 async crash-window undercount affects only the actual_num_records field in metadata.json — no parquet data is lost or corrupted. All other logic is well-structured and the test coverage is excellent.

packages/data-designer/src/data_designer/interface/data_designer.py (wrong docstring) and packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py (initial_actual_num_records in async crash window)

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Core resume logic added for both sync (_build_with_resume) and async (_build_async) paths; async crash-window handling uses filesystem count for batch totals but metadata count for actual_num_records, creating a potential undercount in the final metadata.
packages/data-designer/src/data_designer/interface/data_designer.py Adds resume param to create(); docstring incorrectly states async resume is "Not supported" when the implementation fully supports it.
packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py Adds resume field, resume-aware resolved_dataset_name logic (use existing vs. raise), and clear_partial_results(); all cases covered and tested.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py Adds start_batch and initial_actual_num_records params to start(); correctly seeds internal counters after reset() for sync resume.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py Adds initial_actual_num_records and initial_total_num_batches params to seed counters for async resume; clean and correct.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Comprehensive resume tests covering sync/async paths, already-complete early-return, missing metadata, param mismatches, crash-window filesystem truth, and processor skip; well-structured.
packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py New tests fully cover the resume flag on resolved_dataset_name and the clear_partial_results helper.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer/src/data_designer/interface/data_designer.py
Line: 828-832

Comment:
**Docstring contradicts the implementation**

The docstring says "Not supported when `DATA_DESIGNER_ASYNC_ENGINE` is enabled," but `build()` passes `resume` through to `_build_async()`, which has a full async resume path. A user reading only the public `DataDesigner.create()` docs will believe async resume is unavailable and never use it.

```suggestion
            resume: If True, resume generation from the last completed batch (sync engine) or
                row group (async engine) found in the existing dataset directory. The run
                parameters (num_records, buffer_size) must match those of the original run.
                Any in-flight partial results from the interrupted run are discarded.
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
Line: 399-406

Comment:
**`actual_num_records` undercount in the crash window**

When a crash occurs between `move_partial_result_to_final_file_path` and `write_metadata` for row group N, the filesystem has N+1 parquet files but metadata still records only N batches. The code correctly uses filesystem count for `initial_total_num_batches`, but `initial_actual_num_records` is still taken from metadata — which excludes row group N's records. The final metadata written after the resumed run will have an `actual_num_records` that is under-counted by exactly one row group's record count.

The parquet data itself is unaffected (all files are present and correct), but users or downstream tooling that rely on `actual_num_records` in `metadata.json` will see a stale value.

How can I resolve this? If you propose a fix, please make it concise.

Reviews (5): Last reviewed commit: "Merge branch 'main' into main" | Re-trigger Greptile

…INE=1)

- Add _find_completed_row_group_ids() to scan parquet-files/ for already-written
  row groups by parsing batch_*.parquet filenames
- _build_async() now accepts resume=True: loads metadata, finds completed row groups,
  clears partial results, and logs progress; returns early if all row groups are done
- _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and
  initial_total_num_batches so the scheduler only processes remaining row groups
  and RowGroupBufferManager starts from the correct counts
- RowGroupBufferManager.__init__ gains initial_actual_num_records and
  initial_total_num_batches params to seed the counters on resume
- finalize_row_group closure now writes incremental metadata after each checkpoint
  so any run (resume or not) can be resumed if interrupted mid-way
- Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1
- Add tests for all new paths
@przemekboruta przemekboruta changed the title feat: resume interrupted dataset generation runs (sync engine) feat: resume interrupted dataset generation runs (sync + async engine) Apr 13, 2026
…set already complete

_build_with_resume and _build_async now return False when the dataset is already
complete (early-return path), True otherwise. build() skips
_processor_runner.run_after_generation() on False, preventing processors from
calling shutil.rmtree and rewriting an already-finalized dataset.

Fixes the issue raised in review: greptile P1 comment on PR NVIDIA-NeMo#526.
…sync resume

Metadata can lag by one row group if a crash occurs between
move_partial_result_to_final_file_path and write_metadata. Using
len(completed_ids) from the filesystem scan instead of
state.num_completed_batches ensures the final metadata reflects the
actual number of parquet files present, not the potentially stale
metadata count.
@github-actions
Copy link
Copy Markdown
Contributor

Issue #525 has been triaged. The linked issue check is being re-evaluated.

@andreatgretel andreatgretel added the agent-review Trigger agentic CI review label Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: resume interrupted dataset generation runs

2 participants