diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index f97054340..d3da3606e 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -7,8 +7,10 @@ import functools import logging import os +import re import time import uuid +from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any, Callable @@ -83,6 +85,12 @@ _CLIENT_VERSION: str = get_library_version() +@dataclass +class _ResumeState: + num_completed_batches: int + actual_num_records: int + + class DatasetBuilder: def __init__( self, @@ -146,6 +154,7 @@ def build( num_records: int, on_batch_complete: Callable[[Path], None] | None = None, save_multimedia_to_disk: bool = True, + resume: bool = False, ) -> Path: """Build the dataset. @@ -155,6 +164,10 @@ def build( save_multimedia_to_disk: Whether to save generated multimedia (images, audio, video) to disk. If False, multimedia is stored directly in the DataFrame (e.g., images as base64). Default is True. + resume: If True, resume generation from the last completed batch (sync engine) or + row group (async engine) found in the existing artifact directory. The run parameters + (num_records, buffer_size) must match those of the original run. Any in-flight + partial results from the interrupted run are discarded. Returns: Path to the generated dataset directory. @@ -172,9 +185,23 @@ def build( start_time = time.perf_counter() buffer_size = self._resource_provider.run_config.buffer_size + if resume and not self.artifact_storage.metadata_file_path.exists(): + # No metadata.json means the previous run was interrupted before any batch (sync) or + # row group (async) completed. Nothing to resume — discard any leftover partial + # results and start fresh. + logger.info( + "▶️ No metadata.json found — the previous run was interrupted before any batch " + "completed. Starting generation from the beginning." + ) + self.artifact_storage.clear_partial_results() + resume = False + + generated = True if DATA_DESIGNER_ASYNC_ENGINE: self._validate_async_compatibility() - self._build_async(generators, num_records, buffer_size, on_batch_complete) + generated = self._build_async(generators, num_records, buffer_size, on_batch_complete, resume=resume) + elif resume: + generated = self._build_with_resume(generators, num_records, buffer_size, on_batch_complete) else: group_id = uuid.uuid4().hex self.batch_manager.start(num_records=num_records, buffer_size=buffer_size) @@ -189,11 +216,96 @@ def build( ) self.batch_manager.finish() - self._processor_runner.run_after_generation(buffer_size) + if generated: + self._processor_runner.run_after_generation(buffer_size) self._resource_provider.model_registry.log_model_usage(time.perf_counter() - start_time) return self.artifact_storage.final_dataset_path + def _load_resume_state(self, num_records: int, buffer_size: int) -> _ResumeState: + """Read and validate resume state from an existing metadata.json. + + Raises: + DatasetGenerationError: If metadata is missing or incompatible with the current run parameters. + """ + try: + metadata = self.artifact_storage.read_metadata() + except FileNotFoundError: + raise DatasetGenerationError( + "🛑 Cannot resume: metadata.json not found in the existing dataset directory. " + "Run without resume=True to start a new generation." + ) + + target = metadata.get("target_num_records") + if target != num_records: + raise DatasetGenerationError( + f"🛑 Cannot resume: num_records={num_records} does not match the original run's " + f"target_num_records={target}. Use the same num_records as the interrupted run, " + "or start a new run without resume=True." + ) + + meta_buffer_size = metadata.get("buffer_size") + if meta_buffer_size != buffer_size: + raise DatasetGenerationError( + f"🛑 Cannot resume: buffer_size={buffer_size} does not match the original run's " + f"buffer_size={meta_buffer_size}. Use the same buffer_size as the interrupted run, " + "or start a new run without resume=True." + ) + + return _ResumeState( + num_completed_batches=metadata["num_completed_batches"], + actual_num_records=metadata["actual_num_records"], + ) + + def _build_with_resume( + self, + generators: list[ColumnGenerator], + num_records: int, + buffer_size: int, + on_batch_complete: Callable[[Path], None] | None, + ) -> bool: + """Resume generation from the last completed batch. + + Returns: + False if the dataset was already complete (no new records generated), + True after successfully generating the remaining batches. + """ + state = self._load_resume_state(num_records, buffer_size) + + self.batch_manager.start( + num_records=num_records, + buffer_size=buffer_size, + start_batch=state.num_completed_batches, + initial_actual_num_records=state.actual_num_records, + ) + + if state.num_completed_batches >= self.batch_manager.num_batches: + logger.warning( + "⚠️ Dataset is already complete — all batches were found in the existing artifact directory. " + "Nothing to resume. Remove resume=True if you want to generate a new dataset." + ) + return False + + logger.info( + f"▶️ Resuming from batch {state.num_completed_batches + 1} of {self.batch_manager.num_batches} " + f"({state.actual_num_records} records already generated)." + ) + + self.artifact_storage.clear_partial_results() + + group_id = uuid.uuid4().hex + for batch_idx in range(state.num_completed_batches, self.batch_manager.num_batches): + logger.info(f"⏳ Processing batch {batch_idx + 1} of {self.batch_manager.num_batches}") + self._run_batch( + generators, + batch_mode="batch", + group_id=group_id, + current_batch_number=batch_idx, + on_batch_complete=on_batch_complete, + ) + self.batch_manager.finish() + return True + def build_preview(self, *, num_records: int) -> pd.DataFrame: self._run_model_health_check_if_needed() self._run_mcp_tool_check_if_needed() @@ -253,25 +365,82 @@ def _validate_async_compatibility(self) -> None: f"disable the async scheduler." ) + def _find_completed_row_group_ids(self) -> set[int]: + """Scan the final dataset directory for already-written row group parquet files. + + Returns: + Set of row-group IDs (batch numbers) that have a parquet file in ``parquet-files/``. + """ + final_path = self.artifact_storage.final_dataset_path + if not final_path.exists(): + return set() + ids: set[int] = set() + for p in final_path.glob("batch_*.parquet"): + m = re.fullmatch(r"batch_(\d+)", p.stem) + if m: + ids.add(int(m.group(1))) + return ids + def _build_async( self, generators: list[ColumnGenerator], num_records: int, buffer_size: int, on_batch_complete: Callable[[Path], None] | None = None, - ) -> None: - """Async task-queue builder path - dispatches tasks based on dependency readiness.""" + *, + resume: bool = False, + ) -> bool: + """Async task-queue builder path - dispatches tasks based on dependency readiness. + + Returns: + False if the dataset was already complete (no new records generated), + True after successfully running the scheduler. + """ logger.info("⚡ DATA_DESIGNER_ASYNC_ENGINE is enabled - using async task-queue builder") settings = self._resource_provider.run_config trace_enabled = settings.async_trace or os.environ.get("DATA_DESIGNER_ASYNC_TRACE", "0") == "1" + skip_row_groups: frozenset[int] = frozenset() + initial_actual_num_records = 0 + initial_total_num_batches = 0 + + if resume: + # Validate run-parameter compatibility only — the async path derives + # ground-truth state from the filesystem, not from the returned state object. + self._load_resume_state(num_records, buffer_size) + completed_ids = self._find_completed_row_group_ids() + skip_row_groups = frozenset(completed_ids) + # Use filesystem as source of truth for both counters — metadata may lag by one + # row group if a crash occurred between move_partial_result_to_final_file_path + # and write_metadata. + initial_total_num_batches = len(completed_ids) + initial_actual_num_records = sum( + min(buffer_size, num_records - rg_id * buffer_size) for rg_id in completed_ids + ) + self.artifact_storage.clear_partial_results() + + total_row_groups = -(-num_records // buffer_size) # ceiling division + if len(completed_ids) >= total_row_groups: + logger.warning( + "⚠️ Dataset is already complete — all row groups were found in the existing artifact " + "directory. Nothing to resume. Remove resume=True if you want to generate a new dataset." + ) + return False + + logger.info( + f"▶️ Resuming async run: {len(completed_ids)} of {total_row_groups} row group(s) already " + f"complete ({initial_actual_num_records} records), skipping them." + ) + def finalize_row_group(rg_id: int) -> None: def on_complete(final_path: Path | str | None) -> None: if final_path is not None and on_batch_complete: on_batch_complete(final_path) buffer_manager.checkpoint_row_group(rg_id, on_complete=on_complete) + # Write incremental metadata after each row group so interrupted runs can be resumed. + buffer_manager.write_metadata(target_num_records=num_records, buffer_size=buffer_size) scheduler, buffer_manager = self._prepare_async_run( generators, @@ -282,6 +451,9 @@ def on_complete(final_path: Path | str | None) -> None: shutdown_error_window=settings.shutdown_error_window, disable_early_shutdown=settings.disable_early_shutdown, trace=trace_enabled, + skip_row_groups=skip_row_groups, + initial_actual_num_records=initial_actual_num_records, + initial_total_num_batches=initial_total_num_batches, ) # Telemetry snapshot @@ -302,8 +474,9 @@ def on_complete(final_path: Path | str | None) -> None: except Exception: logger.debug("Failed to emit batch telemetry for async run", exc_info=True) - # Write metadata + # Write final metadata (overwrites the last incremental write with identical content). buffer_manager.write_metadata(target_num_records=num_records, buffer_size=buffer_size) + return True def _prepare_async_run( self, @@ -317,6 +490,9 @@ def _prepare_async_run( shutdown_error_window: int = 10, disable_early_shutdown: bool = False, trace: bool = False, + skip_row_groups: frozenset[int] = frozenset(), + initial_actual_num_records: int = 0, + initial_total_num_batches: int = 0, ) -> tuple[AsyncTaskScheduler, RowGroupBufferManager]: """Build a fully-wired scheduler and buffer manager for async generation. @@ -339,18 +515,23 @@ def _prepare_async_run( for gen in generators: gen.log_pre_generation() - # Partition into row groups + # Partition into row groups, skipping any already completed on resume. row_groups: list[tuple[int, int]] = [] remaining = num_records rg_id = 0 while remaining > 0: size = min(buffer_size, remaining) - row_groups.append((rg_id, size)) + if rg_id not in skip_row_groups: + row_groups.append((rg_id, size)) remaining -= size rg_id += 1 tracker = CompletionTracker.with_graph(graph, row_groups) - buffer_manager = RowGroupBufferManager(self.artifact_storage) + buffer_manager = RowGroupBufferManager( + self.artifact_storage, + initial_actual_num_records=initial_actual_num_records, + initial_total_num_batches=initial_total_num_batches, + ) # Pre-batch processor callback: runs after seed tasks complete for a row group. # If it raises, the scheduler drops all rows in the row group (skips it). diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py index 09e6ce9c8..13e716190 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py @@ -157,7 +157,14 @@ def reset(self, delete_files: bool = False) -> None: except OSError as e: raise DatasetBatchManagementError(f"🛑 Failed to delete directory {dir_path}: {e}") - def start(self, *, num_records: int, buffer_size: int) -> None: + def start( + self, + *, + num_records: int, + buffer_size: int, + start_batch: int = 0, + initial_actual_num_records: int = 0, + ) -> None: if num_records <= 0: raise DatasetBatchManagementError("🛑 num_records must be positive.") if buffer_size <= 0: @@ -168,6 +175,8 @@ def start(self, *, num_records: int, buffer_size: int) -> None: if remaining_records := num_records % buffer_size: self._num_records_list.append(remaining_records) self.reset() + self._current_batch_number = start_batch + self._actual_num_records = initial_actual_num_records def write(self) -> Path | None: """Write the current batch to a parquet file. diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py index 3adad1456..16557bc08 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py @@ -27,13 +27,18 @@ class RowGroupBufferManager: exclusively by the async scheduler. """ - def __init__(self, artifact_storage: ArtifactStorage) -> None: + def __init__( + self, + artifact_storage: ArtifactStorage, + initial_actual_num_records: int = 0, + initial_total_num_batches: int = 0, + ) -> None: self._buffers: dict[int, list[dict]] = {} self._row_group_sizes: dict[int, int] = {} self._dropped: dict[int, set[int]] = {} self._artifact_storage = artifact_storage - self._actual_num_records: int = 0 - self._total_num_batches: int = 0 + self._actual_num_records: int = initial_actual_num_records + self._total_num_batches: int = initial_total_num_batches def init_row_group(self, row_group: int, size: int) -> None: """Allocate a buffer for *row_group* with *size* empty rows.""" diff --git a/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py b/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py index 458eed689..907422d0d 100644 --- a/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py +++ b/packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py @@ -47,6 +47,7 @@ class ArtifactStorage(BaseModel): partial_results_folder_name: str = "tmp-partial-parquet-files" dropped_columns_folder_name: str = "dropped-columns-parquet-files" processors_outputs_folder_name: str = PROCESSORS_OUTPUTS_FOLDER_NAME + resume: bool = False _media_storage: MediaStorage = PrivateAttr(default=None) @property @@ -67,12 +68,19 @@ def artifact_path_exists(self) -> bool: def resolved_dataset_name(self) -> str: dataset_path = self.artifact_path / self.dataset_name if dataset_path.exists() and len(list(dataset_path.iterdir())) > 0: + if self.resume: + return self.dataset_name new_dataset_name = f"{self.dataset_name}_{datetime.now().strftime('%m-%d-%Y_%H%M%S')}" logger.info( f"📂 Dataset path {str(dataset_path)!r} already exists. Dataset from this session" f"\n\t\t will be saved to {str(self.artifact_path / new_dataset_name)!r} instead." ) return new_dataset_name + if self.resume: + raise ArtifactStorageError( + f"🛑 Cannot resume: no existing dataset found at {str(dataset_path)!r}. " + "Run without resume=True to start a new generation." + ) return self.dataset_name @property @@ -204,6 +212,11 @@ def load_dataset_with_dropped_columns(self) -> pd.DataFrame: df = lazy.pd.concat([df, df_dropped], axis=1) return df + def clear_partial_results(self) -> None: + """Remove any in-flight partial results left over from an interrupted run.""" + if self.partial_results_path.exists(): + shutil.rmtree(self.partial_results_path) + def move_partial_result_to_final_file_path(self, batch_number: int) -> Path: partial_result_path = self.create_batch_file_path(batch_number, batch_stage=BatchStage.PARTIAL_RESULT) if not partial_result_path.exists(): diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index 477978a38..8d79fcd50 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -3,7 +3,9 @@ from __future__ import annotations +import json import logging +from pathlib import Path from typing import TYPE_CHECKING from unittest.mock import Mock, patch @@ -32,6 +34,7 @@ from data_designer.engine.processing.processors.base import Processor from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry from data_designer.engine.resources.seed_reader import DataFrameSeedReader +from data_designer.engine.storage.artifact_storage import ArtifactStorage if TYPE_CHECKING: import pandas as pd @@ -937,3 +940,320 @@ def test_allow_resize_multiple_batches( else: df = lazy.pd.read_parquet(final_path) assert len(df) == expected_total_rows + + +# --------------------------------------------------------------------------- +# Resume mechanism tests +# --------------------------------------------------------------------------- + + +def _write_metadata(dataset_dir: Path, **fields) -> None: + """Write a metadata.json into an existing dataset folder.""" + dataset_dir.mkdir(parents=True, exist_ok=True) + (dataset_dir / "sentinel.txt").write_text("x") # make folder non-empty for resolved_dataset_name + (dataset_dir / "metadata.json").write_text(json.dumps(fields)) + + +def _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, *, buffer_size: int = 2): + """Return a DatasetBuilder whose ArtifactStorage has resume=True.""" + storage = ArtifactStorage(artifact_path=tmp_path, resume=True) + stub_resource_provider.artifact_storage = storage + stub_resource_provider.run_config = RunConfig(buffer_size=buffer_size) + return DatasetBuilder( + data_designer_config=stub_test_config_builder.build(), + resource_provider=stub_resource_provider, + ) + + +def test_build_resume_starts_fresh_without_metadata(stub_resource_provider, stub_test_config_builder, tmp_path, caplog): + """resume=True when only the folder exists (no metadata.json) logs an info message and starts fresh. + + This covers the case where a run was interrupted before any batch completed — the + folder was created by _write_builder_config but metadata.json was never written. + Previously this raised DatasetGenerationError; now it silently restarts from batch 0. + """ + # Pre-create the folder with content so resolved_dataset_name(resume=True) returns "dataset" + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + (dataset_dir / "builder_config.json").write_text("{}") # non-empty, no metadata + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path) + with caplog.at_level(logging.INFO): + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder, "_run_batch"): + with patch.object(builder.batch_manager, "finish"): + # resume=False is set internally; build dispatches to the normal (non-resume) path + builder.build(num_records=4, resume=True) + + assert any("interrupted before any batch completed" in record.message for record in caplog.records) + + +def test_build_resume_raises_on_num_records_mismatch(stub_resource_provider, stub_test_config_builder, tmp_path): + """resume=True raises when num_records differs from the original run.""" + dataset_dir = tmp_path / "dataset" + _write_metadata( + dataset_dir, + target_num_records=10, + buffer_size=2, + num_completed_batches=2, + actual_num_records=4, + ) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + with pytest.raises(DatasetGenerationError, match="num_records=4 does not match"): + builder.build(num_records=4, resume=True) + + +def test_build_resume_raises_on_buffer_size_mismatch(stub_resource_provider, stub_test_config_builder, tmp_path): + """resume=True raises when buffer_size differs from the original run.""" + dataset_dir = tmp_path / "dataset" + _write_metadata( + dataset_dir, + target_num_records=4, + buffer_size=2, + num_completed_batches=1, + actual_num_records=2, + ) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=3) + with pytest.raises(DatasetGenerationError, match="buffer_size=3 does not match"): + builder.build(num_records=4, resume=True) + + +def test_build_resume_logs_warning_when_already_complete( + stub_resource_provider, stub_test_config_builder, tmp_path, caplog +): + """resume=True on a fully-complete dataset logs a warning and returns without generating.""" + dataset_dir = tmp_path / "dataset" + # 4 records, 2 per batch = 2 batches; num_completed_batches == 2 → already done + _write_metadata( + dataset_dir, + target_num_records=4, + buffer_size=2, + num_completed_batches=2, + actual_num_records=4, + ) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + with caplog.at_level(logging.WARNING): + builder.build(num_records=4, resume=True) + + assert any("already complete" in record.message for record in caplog.records) + + +def test_build_resume_already_complete_does_not_run_after_generation_processors( + stub_resource_provider, stub_test_config_builder, tmp_path +): + """When already complete, run_after_generation must NOT be called (would destroy the dataset).""" + dataset_dir = tmp_path / "dataset" + _write_metadata( + dataset_dir, + target_num_records=4, + buffer_size=2, + num_completed_batches=2, + actual_num_records=4, + ) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + with patch.object(builder._processor_runner, "run_after_generation") as mock_after: + builder.build(num_records=4, resume=True) + + mock_after.assert_not_called() + + +# --------------------------------------------------------------------------- +# _find_completed_row_group_ids tests +# --------------------------------------------------------------------------- + + +def test_find_completed_row_group_ids_empty_dir(stub_resource_provider, stub_test_config_builder, tmp_path): + """Returns empty set when final_dataset_path does not exist.""" + dataset_dir = tmp_path / "dataset" + _write_metadata(dataset_dir, target_num_records=4, buffer_size=2, num_completed_batches=0, actual_num_records=0) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path) + assert builder._find_completed_row_group_ids() == set() + + +def test_find_completed_row_group_ids_with_files(stub_resource_provider, stub_test_config_builder, tmp_path): + """Returns correct IDs from batch_*.parquet files in parquet-files/.""" + dataset_dir = tmp_path / "dataset" + _write_metadata(dataset_dir, target_num_records=6, buffer_size=2, num_completed_batches=2, actual_num_records=4) + + parquet_dir = dataset_dir / "parquet-files" + parquet_dir.mkdir(parents=True) + (parquet_dir / "batch_00000.parquet").write_text("") + (parquet_dir / "batch_00002.parquet").write_text("") + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + assert builder._find_completed_row_group_ids() == {0, 2} + + +def test_find_completed_row_group_ids_ignores_non_batch_files( + stub_resource_provider, stub_test_config_builder, tmp_path +): + """Non-batch files in parquet-files/ are silently ignored.""" + dataset_dir = tmp_path / "dataset" + _write_metadata(dataset_dir, target_num_records=4, buffer_size=2, num_completed_batches=1, actual_num_records=2) + + parquet_dir = dataset_dir / "parquet-files" + parquet_dir.mkdir(parents=True) + (parquet_dir / "batch_00001.parquet").write_text("") + (parquet_dir / "unrelated.parquet").write_text("") + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + assert builder._find_completed_row_group_ids() == {1} + + +# --------------------------------------------------------------------------- +# Async resume via _build_async tests +# --------------------------------------------------------------------------- + + +def _write_parquet_files(parquet_dir: Path, row_group_ids: list[int]) -> None: + """Create stub batch_*.parquet files for the given row group IDs.""" + parquet_dir.mkdir(parents=True, exist_ok=True) + for rg_id in row_group_ids: + (parquet_dir / f"batch_{rg_id:05d}.parquet").write_text("") + + +def test_build_async_resume_logs_warning_when_already_complete( + stub_resource_provider, stub_test_config_builder, tmp_path, caplog +): + """Async resume on a fully-complete dataset logs a warning and returns without running.""" + dataset_dir = tmp_path / "dataset" + # 4 records at buffer_size=2 → 2 row groups (IDs 0 and 1) + _write_metadata(dataset_dir, target_num_records=4, buffer_size=2, num_completed_batches=2, actual_num_records=4) + _write_parquet_files(dataset_dir / "parquet-files", [0, 1]) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + + with caplog.at_level(logging.WARNING): + with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): + with patch.object(builder, "_run_model_health_check_if_needed"): + builder.build(num_records=4, resume=True) + + assert any("already complete" in record.message for record in caplog.records) + + +def test_build_async_resume_starts_fresh_without_metadata( + stub_resource_provider, stub_test_config_builder, tmp_path, caplog +): + """Async resume with no metadata.json logs an info message and starts fresh. + + Previously this raised DatasetGenerationError; now it silently restarts from row group 0. + The log is emitted in build() before dispatching to _build_async, so mocking _build_async + does not suppress the message. + """ + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + (dataset_dir / "builder_config.json").write_text("{}") + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path) + + with caplog.at_level(logging.INFO): + with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder, "_build_async", return_value=True) as mock_async: + builder.build(num_records=4, resume=True) + + # _build_async is called with resume=False because the no-metadata path resets the flag + _, kwargs = mock_async.call_args + assert kwargs.get("resume") is False + assert any("interrupted before any batch completed" in record.message for record in caplog.records) + + +def test_build_async_resume_already_complete_does_not_run_after_generation_processors( + stub_resource_provider, stub_test_config_builder, tmp_path +): + """Async resume: when already complete, run_after_generation must NOT be called.""" + dataset_dir = tmp_path / "dataset" + _write_metadata(dataset_dir, target_num_records=4, buffer_size=2, num_completed_batches=2, actual_num_records=4) + _write_parquet_files(dataset_dir / "parquet-files", [0, 1]) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + + with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder._processor_runner, "run_after_generation") as mock_after: + builder.build(num_records=4, resume=True) + + mock_after.assert_not_called() + + +def test_find_completed_row_group_ids_used_for_initial_total_batches( + stub_resource_provider, stub_test_config_builder, tmp_path +): + """initial_total_num_batches uses filesystem count, not metadata count. + + Simulates the crash window: 2 parquet files exist on disk but metadata still + records num_completed_batches=1 (write_metadata crashed after the second + row group was moved to parquet-files/ but before metadata was updated). + Verifies that _find_completed_row_group_ids() (= 2) is used, not metadata (= 1). + """ + dataset_dir = tmp_path / "dataset" + # Metadata lags — says only 1 batch completed + _write_metadata(dataset_dir, target_num_records=4, buffer_size=2, num_completed_batches=1, actual_num_records=2) + # Filesystem truth — 2 row groups already written + _write_parquet_files(dataset_dir / "parquet-files", [0, 1]) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + # Both row groups are on disk → dataset is already complete → generated=False + with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder._processor_runner, "run_after_generation") as mock_after: + builder.build(num_records=4, resume=True) + + # Already complete based on filesystem count (2 files ≥ 2 row groups) — no generation needed + mock_after.assert_not_called() + + +def test_initial_actual_num_records_from_filesystem_in_crash_window( + stub_resource_provider, stub_test_config_builder, tmp_path +): + """initial_actual_num_records is derived from filesystem, not stale metadata. + + Crash window scenario: row groups 0 and 1 are on disk but metadata only records + num_completed_batches=1 / actual_num_records=2 (write_metadata crashed after + the second row group was written but before it updated the file). + + With 6 records and buffer_size=2 (3 row groups total), the correct + initial_actual_num_records is 4 (groups 0+1), not 2 (stale metadata value). + """ + import asyncio as stdlib_asyncio + + dataset_dir = tmp_path / "dataset" + # Metadata lags — says only 1 batch completed with 2 records + _write_metadata(dataset_dir, target_num_records=6, buffer_size=2, num_completed_batches=1, actual_num_records=2) + # Filesystem truth — 2 row groups already written (ids 0 and 1) + _write_parquet_files(dataset_dir / "parquet-files", [0, 1]) + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path, buffer_size=2) + + captured: dict = {} + + def capturing_prepare(*args, **kwargs): + captured["initial_actual_num_records"] = kwargs.get("initial_actual_num_records", 0) + captured["initial_total_num_batches"] = kwargs.get("initial_total_num_batches", 0) + mock_scheduler = Mock() + mock_scheduler.traces = [] + mock_buffer_manager = Mock() + return mock_scheduler, mock_buffer_manager + + mock_future = Mock() + mock_future.result = Mock(return_value=None) + + # asyncio and ensure_async_engine_loop are lazy-imported in dataset_builder only when + # DATA_DESIGNER_ASYNC_ENGINE=True at module load time. Inject them for the duration + # of this test so _build_async can proceed past the early-return path. + with patch.object(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", True): + with patch.object(builder_mod, "asyncio", stdlib_asyncio, create=True): + with patch.object(builder_mod, "ensure_async_engine_loop", Mock(return_value=Mock()), create=True): + with patch.object(stdlib_asyncio, "run_coroutine_threadsafe", return_value=mock_future): + with patch.object(builder, "_run_model_health_check_if_needed"): + with patch.object(builder, "_prepare_async_run", side_effect=capturing_prepare): + builder.build(num_records=6, resume=True) + + # Filesystem says 2 groups done (IDs 0+1) → 2+2 = 4 records, not stale metadata value 2 + assert captured["initial_actual_num_records"] == 4 + assert captured["initial_total_num_batches"] == 2 diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py index e2529e1fa..cf3a600bf 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py @@ -451,3 +451,40 @@ def test_full_workflow(stub_batch_manager): # Verify all files exist assert stub_batch_manager.artifact_storage.metadata_file_path.exists() assert len(list(stub_batch_manager.artifact_storage.final_dataset_path.glob("*.parquet"))) == 3 + + +# --------------------------------------------------------------------------- +# start() with resume parameters +# --------------------------------------------------------------------------- + + +def test_start_with_start_batch(stub_batch_manager): + """start_batch shifts _current_batch_number so the loop skips already-done batches.""" + stub_batch_manager.start(num_records=10, buffer_size=3, start_batch=2) + + assert stub_batch_manager._current_batch_number == 2 + assert stub_batch_manager.num_batches == 4 + assert stub_batch_manager.buffer_is_empty is True + + +def test_start_with_initial_actual_num_records(stub_batch_manager): + """initial_actual_num_records pre-populates the running total for resumed runs.""" + stub_batch_manager.start(num_records=10, buffer_size=3, initial_actual_num_records=6) + + assert stub_batch_manager._actual_num_records == 6 + + +def test_start_with_start_batch_and_initial_actual_num_records(stub_batch_manager): + """Both resume params can be set together.""" + stub_batch_manager.start(num_records=10, buffer_size=3, start_batch=2, initial_actual_num_records=6) + + assert stub_batch_manager._current_batch_number == 2 + assert stub_batch_manager._actual_num_records == 6 + + +def test_start_default_values_unchanged(stub_batch_manager): + """Default call (no resume params) still starts at batch 0 with 0 actual records.""" + stub_batch_manager.start(num_records=10, buffer_size=3) + + assert stub_batch_manager._current_batch_number == 0 + assert stub_batch_manager._actual_num_records == 0 diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py index 37b6f71ac..08dad3730 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py @@ -223,3 +223,35 @@ def test_checkpoint_calls_on_complete_when_all_rows_dropped() -> None: callback.assert_called_once_with(None) storage.write_batch_to_parquet_file.assert_not_called() + + +def test_initial_actual_num_records() -> None: + """initial_actual_num_records pre-seeds the actual_num_records counter.""" + storage = _mock_artifact_storage() + storage.write_batch_to_parquet_file.return_value = "/fake/path.parquet" + storage.move_partial_result_to_final_file_path.return_value = "/fake/final.parquet" + + mgr = RowGroupBufferManager(storage, initial_actual_num_records=10) + mgr.init_row_group(0, 3) + mgr.update_batch(0, "col", ["a", "b", "c"]) + mgr.checkpoint_row_group(0) + + assert mgr.actual_num_records == 13 + + +def test_initial_total_num_batches_reflected_in_metadata() -> None: + """initial_total_num_batches pre-seeds the batch counter used by write_metadata.""" + storage = _mock_artifact_storage() + storage.write_batch_to_parquet_file.return_value = "/fake/path.parquet" + storage.move_partial_result_to_final_file_path.return_value = "/fake/final.parquet" + + mgr = RowGroupBufferManager(storage, initial_actual_num_records=5, initial_total_num_batches=2) + mgr.init_row_group(2, 2) + mgr.update_batch(2, "col", ["x", "y"]) + mgr.checkpoint_row_group(2) + + mgr.write_metadata(target_num_records=9, buffer_size=3) + + written = storage.write_metadata.call_args[0][0] + assert written["num_completed_batches"] == 3 # 2 initial + 1 new + assert written["actual_num_records"] == 7 # 5 initial + 2 new diff --git a/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py b/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py index 6206d5bbc..7dc1d8b1e 100644 --- a/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py +++ b/packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py @@ -412,3 +412,61 @@ def test_standalone_load_processor_dataset_raises_file_not_found(tmp_path): """Standalone function raises FileNotFoundError (not ArtifactStorageError).""" with pytest.raises(FileNotFoundError, match="No artifacts found"): load_processor_dataset(tmp_path, "nonexistent") + + +# --------------------------------------------------------------------------- +# Resume flag tests +# --------------------------------------------------------------------------- + + +def test_resolved_dataset_name_creates_timestamped_copy_when_folder_exists(tmp_path): + """Default behaviour: existing non-empty folder gets a timestamped sibling.""" + existing = tmp_path / "dataset" + existing.mkdir() + (existing / "some_file.txt").write_text("x") + + storage = ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset") + name = storage.resolved_dataset_name + assert name != "dataset" + assert name.startswith("dataset_") + + +def test_resolved_dataset_name_resume_uses_existing_folder(tmp_path): + """With resume=True, an existing non-empty folder is used as-is.""" + existing = tmp_path / "dataset" + existing.mkdir() + (existing / "some_file.txt").write_text("x") + + storage = ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=True) + assert storage.resolved_dataset_name == "dataset" + + +def test_resolved_dataset_name_resume_raises_when_no_existing_folder(tmp_path): + """With resume=True, missing dataset folder raises ArtifactStorageError at init.""" + with pytest.raises(ArtifactStorageError, match="Cannot resume"): + ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=True) + + +def test_resolved_dataset_name_resume_raises_when_folder_is_empty(tmp_path): + """With resume=True, an empty existing folder raises ArtifactStorageError at init.""" + (tmp_path / "dataset").mkdir() + + with pytest.raises(ArtifactStorageError, match="Cannot resume"): + ArtifactStorage(artifact_path=tmp_path, dataset_name="dataset", resume=True) + + +def test_clear_partial_results_removes_partial_folder(tmp_path, stub_sample_dataframe): + """clear_partial_results() deletes the partial results directory and its contents.""" + storage = ArtifactStorage(artifact_path=tmp_path) + storage.write_batch_to_parquet_file(0, stub_sample_dataframe, BatchStage.PARTIAL_RESULT) + assert storage.partial_results_path.exists() + + storage.clear_partial_results() + assert not storage.partial_results_path.exists() + + +def test_clear_partial_results_is_noop_when_no_partial_folder(tmp_path): + """clear_partial_results() does not raise when the partial results folder is absent.""" + storage = ArtifactStorage(artifact_path=tmp_path) + assert not storage.partial_results_path.exists() + storage.clear_partial_results() # must not raise diff --git a/packages/data-designer/src/data_designer/interface/data_designer.py b/packages/data-designer/src/data_designer/interface/data_designer.py index e487074a5..3f5ecff39 100644 --- a/packages/data-designer/src/data_designer/interface/data_designer.py +++ b/packages/data-designer/src/data_designer/interface/data_designer.py @@ -190,6 +190,7 @@ def create( *, num_records: int = DEFAULT_NUM_RECORDS, dataset_name: str = "dataset", + resume: bool = False, ) -> DatasetCreationResults: """Create dataset and save results to the local artifact storage. @@ -207,6 +208,12 @@ def create( a datetime stamp. For example, if the dataset name is "awesome_dataset" and a directory with the same name already exists, the dataset will be saved to a new directory with the name "awesome_dataset_2025-01-01_12-00-00". + resume: If True, resume generation from the last completed batch (sync engine) + or row group (async engine) found in the existing dataset directory. If no + progress was checkpointed yet (i.e. the run was interrupted before the first + batch/row-group completed), generation restarts from the beginning. The run + parameters (num_records, buffer_size) must match those of the original run. + Any in-flight partial results from the interrupted run are discarded. Returns: DatasetCreationResults object with methods for loading the generated dataset, @@ -218,11 +225,11 @@ def create( """ logger.info("🎨 Creating Data Designer dataset") - resource_provider = self._create_resource_provider(dataset_name, config_builder) + resource_provider = self._create_resource_provider(dataset_name, config_builder, resume=resume) try: builder = self._create_dataset_builder(config_builder.build(), resource_provider) - builder.build(num_records=num_records) + builder.build(num_records=num_records, resume=resume) except Exception as e: raise DataDesignerGenerationError(f"🛑 Error generating dataset: {e}") from e @@ -448,7 +455,7 @@ def _create_dataset_profiler( ) def _create_resource_provider( - self, dataset_name: str, config_builder: DataDesignerConfigBuilder + self, dataset_name: str, config_builder: DataDesignerConfigBuilder, *, resume: bool = False ) -> ResourceProvider: ArtifactStorage.mkdir_if_needed(self._artifact_path) @@ -457,7 +464,9 @@ def _create_resource_provider( seed_dataset_source = seed_config.source return create_resource_provider( - artifact_storage=ArtifactStorage(artifact_path=self._artifact_path, dataset_name=dataset_name), + artifact_storage=ArtifactStorage( + artifact_path=self._artifact_path, dataset_name=dataset_name, resume=resume + ), model_configs=config_builder.model_configs, secret_resolver=self._secret_resolver, model_provider_registry=self._model_provider_registry, diff --git a/plans/525/resume-interrupted-runs.md b/plans/525/resume-interrupted-runs.md new file mode 100644 index 000000000..d3f6f1f4e --- /dev/null +++ b/plans/525/resume-interrupted-runs.md @@ -0,0 +1,176 @@ +--- +date: 2026-04-13 +authors: + - pboruta +issue: https://github.com/NVIDIA-NeMo/DataDesigner/issues/525 +--- + +# Plan: Resume interrupted dataset generation runs + +## Problem + +When a long-running `DataDesigner.create()` call is interrupted (machine crash, OOM kill, preemption), the user has to restart generation from scratch — even though completed batches are already durably written to disk and `metadata.json` tracks exactly how many finished. + +The situation is made worse by an existing safeguard that fires at the wrong time: `ArtifactStorage.resolved_dataset_name` detects the existing folder on the next run and silently creates a new timestamped directory, orphaning the previous partial results instead of resuming from them. + +## Proposed Solution + +Add `resume: bool = False` to `DataDesigner.create()`. When `resume=True` the engine reads `metadata.json` from the existing dataset directory, validates that the run parameters are compatible, and starts the batch loop from the first incomplete batch rather than from zero. + +Expected usage: + +```python +dd = DataDesigner(...) +dd.add_column(...) + +# First run — interrupted at batch 7 of 20 +results = dd.create(config_builder, num_records=10_000) + +# After restart — picks up from batch 8 +results = dd.create(config_builder, num_records=10_000, resume=True) +``` + +## Design Decisions + +| Decision | Choice | Rationale | +|---|---|---| +| API surface | `resume: bool = False` on `DataDesigner.create()` | Opt-in flag keeps default behaviour unchanged. Users who want a clean re-run keep getting the timestamped-folder behaviour. | +| Resume state source | Read `metadata.json` written after each completed batch | Already contains `num_completed_batches`, `target_num_records`, `buffer_size`, `actual_num_records`. No new persistence needed. | +| Partial batch at crash time | Clear `tmp-partial-parquet-files/` at resume start | Simpler and safer than merging an incomplete parquet; losing one batch is acceptable since the user is already recovering from a crash. | +| Compatibility validation | Raise `DatasetGenerationError` if `num_records` or `buffer_size` changed | Different `num_records` changes which rows land in which batch file, breaking the numbering invariant. `buffer_size` changes the file-per-batch mapping. Both must match. | +| Async engine | Supported — derives ground-truth state from the filesystem via `_find_completed_row_group_ids()` | The async path scans completed `batch_*.parquet` files rather than reading `metadata.json`, avoiding the metadata-lag crash window. Both `initial_actual_num_records` and `initial_total_num_batches` are sourced from the filesystem. Incremental `write_metadata` calls after each row group enable resumability. | +| Already-complete runs | Detect and warn, return existing path | If `num_completed_batches == total_num_batches` the dataset is already complete; the user may have re-run by mistake. | +| No metadata → restart fresh | Log info message and restart from batch 0 | If `metadata.json` is missing the run was interrupted before any batch completed. Restarting silently is safer UX than forcing the user to remove `resume=True`. | + +## Affected Files + +| File | Change | +|---|---| +| `packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py` | Add `resume: bool = False` field; modify `resolved_dataset_name` to skip timestamping when `resume=True`; add `clear_partial_results()` helper | +| `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py` | Add `start_batch: int = 0` and `initial_actual_num_records: int = 0` to `start()` | +| `packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py` | Add `resume: bool = False` to `build()`; add `_load_resume_state()` private method; implement validation and batch-skip logic | +| `packages/data-designer/src/data_designer/interface/data_designer.py` | Add `resume: bool = False` to `create()` and `_create_resource_provider()`; pass through to `ArtifactStorage` and `builder.build()` | +| `packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py` | Tests for resume flag on `resolved_dataset_name` and `clear_partial_results()` | +| `packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py` | Tests for `start_batch` and `initial_actual_num_records` parameters | +| `packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py` | Tests for resume validation, batch skipping, async engine error, already-complete detection | + +## Implementation Sketch + +### `ArtifactStorage` + +```python +class ArtifactStorage(BaseModel): + ... + resume: bool = False + + @cached_property + def resolved_dataset_name(self) -> str: + dataset_path = self.artifact_path / self.dataset_name + if dataset_path.exists() and len(list(dataset_path.iterdir())) > 0: + if self.resume: + return self.dataset_name # use existing folder as-is + # existing behaviour: create timestamped copy + new_dataset_name = f"{self.dataset_name}_{datetime.now().strftime(...)}" + ... + return new_dataset_name + if self.resume: + raise ArtifactStorageError( + f"Cannot resume: no existing dataset found at {dataset_path!r}." + ) + return self.dataset_name + + def clear_partial_results(self) -> None: + """Remove any in-flight partial results left over from an interrupted run.""" + if self.partial_results_path.exists(): + shutil.rmtree(self.partial_results_path) +``` + +### `DatasetBatchManager.start()` + +```python +def start( + self, + *, + num_records: int, + buffer_size: int, + start_batch: int = 0, + initial_actual_num_records: int = 0, +) -> None: + ... + self.reset() + self._current_batch_number = start_batch + self._actual_num_records = initial_actual_num_records +``` + +### `DatasetBuilder.build()` — resume path + +```python +@dataclass +class _ResumeState: + num_completed_batches: int + actual_num_records: int + buffer_size: int + +def _load_resume_state(self, num_records: int, buffer_size: int) -> _ResumeState: + try: + metadata = self.artifact_storage.read_metadata() + except FileNotFoundError: + raise DatasetGenerationError("Cannot resume: metadata.json not found. ...") + + target = metadata.get("target_num_records") + if target != num_records: + raise DatasetGenerationError( + f"Cannot resume: num_records={num_records} does not match " + f"the original run's target_num_records={target}. ..." + ) + + meta_buffer_size = metadata.get("buffer_size") + if meta_buffer_size != buffer_size: + raise DatasetGenerationError( + f"Cannot resume: buffer_size={buffer_size} does not match " + f"the original run's buffer_size={meta_buffer_size}. ..." + ) + + return _ResumeState( + num_completed_batches=metadata["num_completed_batches"], + actual_num_records=metadata["actual_num_records"], + buffer_size=buffer_size, + ) + +def build(self, *, num_records, on_batch_complete=None, save_multimedia_to_disk=True, resume=False): + ... + if resume and DATA_DESIGNER_ASYNC_ENGINE: + raise DatasetGenerationError("resume=True is not supported with DATA_DESIGNER_ASYNC_ENGINE.") + + buffer_size = self._resource_provider.run_config.buffer_size + + if resume: + state = self._load_resume_state(num_records, buffer_size) + if state.num_completed_batches * buffer_size >= num_records: + logger.warning("Dataset already complete — nothing to resume.") + return self.artifact_storage.final_dataset_path + self.artifact_storage.clear_partial_results() + self.batch_manager.start( + num_records=num_records, + buffer_size=buffer_size, + start_batch=state.num_completed_batches, + initial_actual_num_records=state.actual_num_records, + ) + for batch_idx in range(state.num_completed_batches, self.batch_manager.num_batches): + ... + else: + # existing path unchanged + self.batch_manager.start(num_records=num_records, buffer_size=buffer_size) + for batch_idx in range(self.batch_manager.num_batches): + ... +``` + +## Trade-offs Considered + +- **Automatic resume detection** (no flag, detect existing folder automatically): rejected — removes user intent. A user re-running a pipeline from scratch would be surprised by silent resumption. +- **Resume support for async engine**: implemented (diverges from original plan). The async path scans the filesystem for completed row groups instead of relying on potentially-stale `metadata.json`, handling the crash window between `move_partial_result_to_final_file_path` and `write_metadata`. +- **Per-column resume** (resume from column N within an interrupted batch): out of scope. Requires per-column checkpointing and state reconstruction, significantly higher complexity. + +## Delivery + +Single PR implementing all changes listed in the affected-files table plus tests. No backwards-incompatible changes — `resume` defaults to `False` and all existing call sites are unaffected.