Skip to content

Commit 7116f3b

Browse files
author
AI Agent
committed
engine: inject native python skip logic for getfastq and quant in streaming mode\n\nThis dramatically cuts the skip penalty loop from ~1.5 seconds per sample to milliseconds by pre-checking filesystem outputs natively instead of spawning a new amalgkit CLI subprocess.
1 parent 30d5c06 commit 7116f3b

1 file changed

Lines changed: 18 additions & 0 deletions

File tree

src/metainformant/rna/engine/workflow_execution.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@ def process_single_sample(sample_row: Dict[str, str], current_chunk_idx: int, sa
116116
single_params = step_params.copy()
117117
single_params["metadata"] = str(single_meta_file)
118118

119+
# FAST PYTHON SKIP LOGIC
120+
# Avoid paying the 1.5 second penalty per skipped sample during spot reboots
121+
if step_name == "quant":
122+
abundance_file = config.work_dir / "quant" / sample_id / f"{sample_id}_abundance.tsv"
123+
if abundance_file.exists() and str(single_params.get("redo", "no")).lower() not in ("yes", "true", "1"):
124+
logger.debug(f" [Sample {sample_id}] Step {step_name} output found. Native quick bypass!")
125+
sample_results.append(WorkflowStepResult(step_name=f"{step_name}_{sample_id}", return_code=0, success=True))
126+
continue
127+
elif step_name == "getfastq":
128+
sra_dir = config.work_dir / "getfastq" / sample_id
129+
fastq_file = sra_dir / f"{sample_id}.amalgkit.fastq.gz"
130+
abundance_file = config.work_dir / "quant" / sample_id / f"{sample_id}_abundance.tsv"
131+
# If abundance exists, getfastq is intrinsically handled. If fastq exists, it's also handled.
132+
if (abundance_file.exists() or fastq_file.exists()) and str(single_params.get("redo", "no")).lower() not in ("yes", "true", "1"):
133+
logger.debug(f" [Sample {sample_id}] Step {step_name} target found. Native quick bypass!")
134+
sample_results.append(WorkflowStepResult(step_name=f"{step_name}_{sample_id}", return_code=0, success=True))
135+
continue
136+
119137
# Dynamically throttle cores to prevent system choking
120138
# If total threads=16, and chunk_size=16, give each sample 1 thread.
121139
base_threads = int(single_params.get("threads", 1))

0 commit comments

Comments
 (0)