feat: resume interrupted dataset generation runs (sync + async engine)#526
feat: resume interrupted dataset generation runs (sync + async engine)#526przemekboruta wants to merge 10 commits intoNVIDIA-NeMo:mainfrom
Conversation
- ArtifactStorage gains a `resume: bool = False` field - resolved_dataset_name skips timestamp logic when resume=True, returning the existing dataset folder name as-is - Raises ArtifactStorageError on resume=True when the target folder is absent or empty (no data to resume from) - New clear_partial_results() removes in-flight partial results left over from an interrupted run Fixes NVIDIA-NeMo#525
DatasetBatchManager.start() now accepts: - start_batch: int = 0 — first batch index to process - initial_actual_num_records: int = 0 — records already on disk Both default to 0 so all existing call sites are unaffected. Fixes NVIDIA-NeMo#525
- build() gains a resume: bool = False parameter - _load_resume_state() reads metadata.json and validates that num_records and buffer_size match the original run - _build_with_resume() skips completed batches, clears in-flight partial results, and continues from the first incomplete batch - Raises DatasetGenerationError with clear messages for: - missing metadata.json (interrupted before first batch completes) - num_records mismatch - buffer_size mismatch - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported) - Logs a warning and returns early when dataset is already complete Fixes NVIDIA-NeMo#525
- create() gains resume: bool = False - _create_resource_provider() passes resume to ArtifactStorage - builder.build() receives the resume flag Fixes NVIDIA-NeMo#525
Covers: - ArtifactStorage.resolved_dataset_name with resume=True - ArtifactStorage.clear_partial_results() - DatasetBatchManager.start() with start_batch and initial_actual_num_records - DatasetBuilder.build(resume=True): missing metadata, num_records mismatch, buffer_size mismatch, already-complete detection Fixes NVIDIA-NeMo#525
Greptile SummaryAdds
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Core resume logic added for both sync (_build_with_resume) and async (_build_async) paths; async crash-window handling uses filesystem count for batch totals but metadata count for actual_num_records, creating a potential undercount in the final metadata. |
| packages/data-designer/src/data_designer/interface/data_designer.py | Adds resume param to create(); docstring incorrectly states async resume is "Not supported" when the implementation fully supports it. |
| packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py | Adds resume field, resume-aware resolved_dataset_name logic (use existing vs. raise), and clear_partial_results(); all cases covered and tested. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py | Adds start_batch and initial_actual_num_records params to start(); correctly seeds internal counters after reset() for sync resume. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py | Adds initial_actual_num_records and initial_total_num_batches params to seed counters for async resume; clean and correct. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py | Comprehensive resume tests covering sync/async paths, already-complete early-return, missing metadata, param mismatches, crash-window filesystem truth, and processor skip; well-structured. |
| packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py | New tests fully cover the resume flag on resolved_dataset_name and the clear_partial_results helper. |
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer/src/data_designer/interface/data_designer.py
Line: 828-832
Comment:
**Docstring contradicts the implementation**
The docstring says "Not supported when `DATA_DESIGNER_ASYNC_ENGINE` is enabled," but `build()` passes `resume` through to `_build_async()`, which has a full async resume path. A user reading only the public `DataDesigner.create()` docs will believe async resume is unavailable and never use it.
```suggestion
resume: If True, resume generation from the last completed batch (sync engine) or
row group (async engine) found in the existing dataset directory. The run
parameters (num_records, buffer_size) must match those of the original run.
Any in-flight partial results from the interrupted run are discarded.
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
Line: 399-406
Comment:
**`actual_num_records` undercount in the crash window**
When a crash occurs between `move_partial_result_to_final_file_path` and `write_metadata` for row group N, the filesystem has N+1 parquet files but metadata still records only N batches. The code correctly uses filesystem count for `initial_total_num_batches`, but `initial_actual_num_records` is still taken from metadata — which excludes row group N's records. The final metadata written after the resumed run will have an `actual_num_records` that is under-counted by exactly one row group's record count.
The parquet data itself is unaffected (all files are present and correct), but users or downstream tooling that rely on `actual_num_records` in `metadata.json` will see a stale value.
How can I resolve this? If you propose a fix, please make it concise.Reviews (5): Last reviewed commit: "Merge branch 'main' into main" | Re-trigger Greptile
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py
Outdated
Show resolved
Hide resolved
…INE=1) - Add _find_completed_row_group_ids() to scan parquet-files/ for already-written row groups by parsing batch_*.parquet filenames - _build_async() now accepts resume=True: loads metadata, finds completed row groups, clears partial results, and logs progress; returns early if all row groups are done - _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and initial_total_num_batches so the scheduler only processes remaining row groups and RowGroupBufferManager starts from the correct counts - RowGroupBufferManager.__init__ gains initial_actual_num_records and initial_total_num_batches params to seed the counters on resume - finalize_row_group closure now writes incremental metadata after each checkpoint so any run (resume or not) can be resumed if interrupted mid-way - Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1 - Add tests for all new paths
…set already complete _build_with_resume and _build_async now return False when the dataset is already complete (early-return path), True otherwise. build() skips _processor_runner.run_after_generation() on False, preventing processors from calling shutil.rmtree and rewriting an already-finalized dataset. Fixes the issue raised in review: greptile P1 comment on PR NVIDIA-NeMo#526.
…sync resume Metadata can lag by one row group if a crash occurs between move_partial_result_to_final_file_path and write_metadata. Using len(completed_ids) from the filesystem scan instead of state.num_completed_batches ensures the final metadata reflects the actual number of parquet files present, not the potentially stale metadata count.
|
Issue #525 has been triaged. The linked issue check is being re-evaluated. |
Summary
Closes #525
Adds
resume: bool = FalsetoDataDesigner.create()andDatasetBuilder.build(). Whenresume=True, generation picks up from where the interrupted run left off — for both the sync and async engines.Changes
ArtifactStorageresume: bool = Falsefield;resolved_dataset_nameskips timestamp logic on resume; newclear_partial_results()DatasetBatchManager.start()start_batchandinitial_actual_num_recordsparams (default 0, no breakage)DatasetBuilder.build()resumeparam;_load_resume_state()reads and validatesmetadata.json;_build_with_resume()skips completed batches (sync);_build_async()skips completed row groups (async)RowGroupBufferManager.__init__()initial_actual_num_recordsandinitial_total_num_batchesparams to seed counters on resumeDatasetBuilder._find_completed_row_group_ids()parquet-files/forbatch_*.parquetto determine which async row groups are already donefinalize_row_groupclosuremetadata.jsonafter every row-group checkpoint (not just at the end), making all async runs resumable if interruptedDataDesigner.create()resume, passes it through toArtifactStorageandbuilder.build()Validation and error cases
metadata.json→DatasetGenerationError(interrupted before any batch completed)num_recordsmismatch →DatasetGenerationErrorbuffer_sizemismatch →DatasetGenerationErrorTest plan
test_resolved_dataset_name_resume_uses_existing_foldertest_resolved_dataset_name_resume_raises_when_no_existing_foldertest_resolved_dataset_name_resume_raises_when_folder_is_emptytest_clear_partial_results_removes_partial_foldertest_clear_partial_results_is_noop_when_no_partial_foldertest_start_with_start_batchtest_start_with_initial_actual_num_recordstest_start_with_start_batch_and_initial_actual_num_recordstest_start_default_values_unchangedtest_build_resume_raises_without_metadatatest_build_resume_raises_on_num_records_mismatchtest_build_resume_raises_on_buffer_size_mismatchtest_build_resume_logs_warning_when_already_completetest_find_completed_row_group_ids_empty_dirtest_find_completed_row_group_ids_with_filestest_find_completed_row_group_ids_ignores_non_batch_filestest_build_async_resume_logs_warning_when_already_completetest_build_async_resume_raises_without_metadatatest_initial_actual_num_recordstest_initial_total_num_batches_reflected_in_metadata