Skip to content

Commit a3db07d

Browse files
committed
work on output thread
1 parent d4c39e6 commit a3db07d

File tree

1 file changed

+88
-81
lines changed

1 file changed

+88
-81
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 88 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,8 @@ def run_jobs(
529529
time.sleep(0.1)
530530

531531
# Signal the output thread to stop
532-
self.job_output_queue.put(None)
532+
self.job_output_queue.put((None, None))
533+
533534
output_thread.join()
534535

535536
return stats
@@ -555,12 +556,14 @@ def _output_queue_worker(self, df, job_db):
555556
self.job_output_queue.task_done()
556557
break
557558

558-
i, status = item
559-
print(f"Output worker processing row {i}: '{status}'", flush=True)
559+
# Expect item to be a tuple: (row_index, updates_dict)
560+
i, updates = item
561+
print(f"Output worker processing row {i}: updates {updates}", flush=True)
560562

561563
with self.df_lock:
562-
# Update the dataframe
563-
df.loc[i, "status"] = status
564+
# Apply each update from the message
565+
for key, value in updates.items():
566+
df.loc[i, key] = value
564567
print(f"Updated dataframe row {i}: {df.loc[i].to_dict()}", flush=True)
565568
# Persist the updated row to the job database
566569
job_db.persist(df.loc[[i]])
@@ -571,6 +574,7 @@ def _output_queue_worker(self, df, job_db):
571574
except Exception as e:
572575
print(f"Output worker crashed: {e}", flush=True)
573576

577+
574578
def _job_update_loop(
575579
self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None
576580
):
@@ -607,7 +611,7 @@ def _job_update_loop(
607611
self._launch_job(i, connection, job_id, stats)
608612
stats["job launch"] += 1
609613

610-
# TODO move to output thread?
614+
# TODO This persistence we need to move of this workflow
611615
with self.df_lock:
612616
job_db.persist(not_started.loc[i : i + 1])
613617
stats["job_db persist"] += 1
@@ -713,7 +717,83 @@ def _launch_job(self, i, connection, job_id, stats):
713717
stats["job start error"] += 1
714718

715719
# Instead of updating the dataframe, push the outcome to the output queue.
716-
self.job_output_queue.put((i, status))
720+
self.job_output_queue.put((i, {"status": status}))
721+
722+
723+
def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None) -> Tuple[List, List, List]:
724+
"""
725+
Tracks status (and stats) of running jobs (in place).
726+
Optionally cancels jobs when running too long.
727+
"""
728+
with self.df_lock:
729+
stats = stats if stats is not None else collections.defaultdict(int)
730+
731+
active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
732+
733+
jobs_done = []
734+
jobs_error = []
735+
jobs_cancel = []
736+
737+
for i in active.index:
738+
job_id = active.loc[i, "id"]
739+
backend_name = active.loc[i, "backend_name"]
740+
previous_status = active.loc[i, "status"]
741+
742+
try:
743+
con = self._get_connection(backend_name)
744+
the_job = con.job(job_id)
745+
job_metadata = the_job.describe()
746+
stats["job describe"] += 1
747+
new_status = job_metadata["status"]
748+
749+
_log.info(
750+
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
751+
)
752+
753+
if new_status == "finished":
754+
stats["job finished"] += 1
755+
jobs_done.append((the_job, active.loc[i]))
756+
757+
if previous_status != "error" and new_status == "error":
758+
stats["job failed"] += 1
759+
jobs_error.append((the_job, active.loc[i]))
760+
761+
if new_status == "canceled":
762+
stats["job canceled"] += 1
763+
jobs_cancel.append((the_job, active.loc[i]))
764+
765+
if previous_status in {"created", "queued"} and new_status == "running":
766+
stats["job started running"] += 1
767+
active.loc[i, "running_start_time"] = rfc3339.utcnow()
768+
769+
if self._cancel_running_job_after and new_status == "running":
770+
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
771+
_log.warning(
772+
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
773+
)
774+
stats["job started running"] += 1
775+
active.loc[i, "running_start_time"] = rfc3339.utcnow()
776+
777+
self._cancel_prolonged_job(the_job, active.loc[i])
778+
779+
active.loc[i, "status"] = new_status
780+
781+
# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
782+
for key in job_metadata.get("usage", {}).keys():
783+
if key in active.columns:
784+
active.loc[i, key] = _format_usage_stat(job_metadata, key)
785+
if "costs" in job_metadata.keys():
786+
active.loc[i, "costs"] = job_metadata.get("costs")
787+
788+
except OpenEoApiError as e:
789+
# TODO: inspect status code and e.g. differentiate between 4xx/5xx
790+
stats["job tracking error"] += 1
791+
_log.warning(f"Error while tracking status of job {job_id!r} on backend {backend_name}: {e!r}")
792+
793+
stats["job_db persist"] += 1
794+
job_db.persist(active)
795+
796+
return jobs_done, jobs_error, jobs_cancel
717797

718798

719799
def on_job_done(self, job: BatchJob, row):
@@ -807,80 +887,7 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
807887
if not job_dir.exists():
808888
job_dir.mkdir(parents=True)
809889

810-
def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None) -> Tuple[List, List, List]:
811-
"""
812-
Tracks status (and stats) of running jobs (in place).
813-
Optionally cancels jobs when running too long.
814-
"""
815-
with self.df_lock:
816-
stats = stats if stats is not None else collections.defaultdict(int)
817-
818-
active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
819-
820-
jobs_done = []
821-
jobs_error = []
822-
jobs_cancel = []
823-
824-
for i in active.index:
825-
job_id = active.loc[i, "id"]
826-
backend_name = active.loc[i, "backend_name"]
827-
previous_status = active.loc[i, "status"]
828-
829-
try:
830-
con = self._get_connection(backend_name)
831-
the_job = con.job(job_id)
832-
job_metadata = the_job.describe()
833-
stats["job describe"] += 1
834-
new_status = job_metadata["status"]
835-
836-
_log.info(
837-
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
838-
)
839-
840-
if new_status == "finished":
841-
stats["job finished"] += 1
842-
jobs_done.append((the_job, active.loc[i]))
843-
844-
if previous_status != "error" and new_status == "error":
845-
stats["job failed"] += 1
846-
jobs_error.append((the_job, active.loc[i]))
847-
848-
if new_status == "canceled":
849-
stats["job canceled"] += 1
850-
jobs_cancel.append((the_job, active.loc[i]))
851-
852-
if previous_status in {"created", "queued"} and new_status == "running":
853-
stats["job started running"] += 1
854-
active.loc[i, "running_start_time"] = rfc3339.utcnow()
855-
856-
if self._cancel_running_job_after and new_status == "running":
857-
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
858-
_log.warning(
859-
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
860-
)
861-
stats["job started running"] += 1
862-
active.loc[i, "running_start_time"] = rfc3339.utcnow()
863-
864-
self._cancel_prolonged_job(the_job, active.loc[i])
865-
866-
active.loc[i, "status"] = new_status
867-
868-
# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
869-
for key in job_metadata.get("usage", {}).keys():
870-
if key in active.columns:
871-
active.loc[i, key] = _format_usage_stat(job_metadata, key)
872-
if "costs" in job_metadata.keys():
873-
active.loc[i, "costs"] = job_metadata.get("costs")
874-
875-
except OpenEoApiError as e:
876-
# TODO: inspect status code and e.g. differentiate between 4xx/5xx
877-
stats["job tracking error"] += 1
878-
_log.warning(f"Error while tracking status of job {job_id!r} on backend {backend_name}: {e!r}")
879-
880-
stats["job_db persist"] += 1
881-
job_db.persist(active)
882-
883-
return jobs_done, jobs_error, jobs_cancel
890+
884891

885892

886893
def _format_usage_stat(job_metadata: dict, field: str) -> str:

0 commit comments

Comments
 (0)