Skip to content

Commit d21c445

Browse files
committed
first version of output thread
1 parent 5819dca commit d21c445

File tree

1 file changed

+24
-16
lines changed

1 file changed

+24
-16
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -544,28 +544,20 @@ def _output_queue_worker(self, df, job_db):
544544
print(f"Failed to load dataframe from job_db: {e}", flush=True)
545545
return
546546

547+
547548
while True:
548549
item = self.job_output_queue.get(timeout=1)
549550
if item is None:
550-
print("Output worker received shutdown signal.", flush=True)
551-
self.job_output_queue.task_done()
552551
break
553-
554-
i, status = item
555-
print(f"Output worker processing row {i}: '{status}'", flush=True)
556-
552+
i, updates = item # Now expecting a dictionary of updates
553+
print(f"Processing updates for row {i}: {updates}", flush=True)
557554
with self.df_lock:
558-
# Update the dataframe
559-
df.loc[i, "status"] = status
560-
print(f"Updated dataframe row {i}: {df.loc[i].to_dict()}", flush=True)
561-
# Persist the updated row to the job database
555+
for col, value in updates.items():
556+
df.loc[i, col] = value
562557
job_db.persist(df.loc[[i]])
563-
564-
print(f"Persisted row {i} to job database.", flush=True)
565558
self.job_output_queue.task_done()
566-
567559
except Exception as e:
568-
print(f"Output worker crashed: {e}", flush=True)
560+
print(f"Output worker error: {e}", flush=True)
569561

570562
def _job_update_loop(
571563
self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None
@@ -607,7 +599,6 @@ def _job_update_loop(
607599
with self.df_lock:
608600
job_db.persist(not_started.loc[i : i + 1])
609601
stats["job_db persist"] += 1
610-
total_added += 1
611602

612603
except Exception as e:
613604
_log.error(f"Job launch failed for index {i}: {e}")
@@ -710,7 +701,7 @@ def _launch_job(self, i, connection, job_id, stats):
710701
stats["job start error"] += 1
711702

712703
# Instead of updating the dataframe, push the outcome to the output queue.
713-
self.job_output_queue.put((i, status))
704+
self.job_output_queue.put((i, {'status': status, 'id': job_id}))
714705

715706

716707
def on_job_done(self, job: BatchJob, row):
@@ -724,6 +715,23 @@ def on_job_done(self, job: BatchJob, row):
724715
"""
725716
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?
726717

718+
719+
# final metric update to queue
720+
updates = {
721+
'status': 'finished',
722+
'cpu': row.get('cpu'),
723+
'memory': row.get('memory'),
724+
'duration': row.get('duration'),
725+
'costs': row.get('costs')
726+
}
727+
728+
with self.df_lock:
729+
for key, value in updates.items():
730+
row[key] = value
731+
# Send updates to the output queue
732+
self.job_output_queue.put((row.name, updates))
733+
734+
#download the data
727735
job_metadata = job.describe()
728736
job_dir = self.get_job_dir(job.job_id)
729737
metadata_path = self.get_job_metadata_path(job.job_id)

0 commit comments

Comments
 (0)