Skip to content

Commit 5819dca

Browse files
committed
first version of output thread
1 parent a5e61fc commit 5819dca

File tree

1 file changed

+111
-129
lines changed

1 file changed

+111
-129
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 111 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010
import warnings
1111
from pathlib import Path
12-
from threading import Thread, Event
12+
from threading import Thread, Event, Lock
1313
from queue import Queue, Empty
1414
from typing import (
1515
Any,
@@ -229,6 +229,7 @@ def __init__(
229229
# Queue for passing job outcomes from worker tasks to the output thread.
230230
self.job_to_run_queue = Queue()
231231
self.job_output_queue = Queue()
232+
self.df_lock = Lock()
232233
# Event used to signal shutdown of the output thread.
233234
self.stop_event = Event()
234235

@@ -502,8 +503,7 @@ def run_jobs(
502503
# TODO: support user-provided `stats`
503504
stats = collections.defaultdict(int)
504505

505-
506-
# Start the output thread.
506+
# Start the output thread that will update the dataframe with status updates.
507507
output_thread = Thread(
508508
target=lambda: self._output_queue_worker(df, job_db),
509509
name="output-worker",
@@ -530,6 +530,43 @@ def run_jobs(
530530

531531
return stats
532532

533+
534+
def _output_queue_worker(self, df, job_db):
535+
try:
536+
print("Output worker started processing job updates.", flush=True)
537+
538+
# Load dataframe from job_db if not provided
539+
if df is None:
540+
try:
541+
df = job_db.read()
542+
print(f"Loaded dataframe from job_db with {len(df)} rows.", flush=True)
543+
except Exception as e:
544+
print(f"Failed to load dataframe from job_db: {e}", flush=True)
545+
return
546+
547+
while True:
548+
item = self.job_output_queue.get(timeout=1)
549+
if item is None:
550+
print("Output worker received shutdown signal.", flush=True)
551+
self.job_output_queue.task_done()
552+
break
553+
554+
i, status = item
555+
print(f"Output worker processing row {i}: '{status}'", flush=True)
556+
557+
with self.df_lock:
558+
# Update the dataframe
559+
df.loc[i, "status"] = status
560+
print(f"Updated dataframe row {i}: {df.loc[i].to_dict()}", flush=True)
561+
# Persist the updated row to the job database
562+
job_db.persist(df.loc[[i]])
563+
564+
print(f"Persisted row {i} to job database.", flush=True)
565+
self.job_output_queue.task_done()
566+
567+
except Exception as e:
568+
print(f"Output worker crashed: {e}", flush=True)
569+
533570
def _job_update_loop(
534571
self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None
535572
):
@@ -563,11 +600,12 @@ def _job_update_loop(
563600
#TODO move to threads for full parallelism
564601
i, connection, job_id = self.job_to_run_queue.get()
565602
try:
566-
self._launch_job(not_started, i, connection, job_id, stats)
603+
self._launch_job(i, connection, job_id, stats)
567604
stats["job launch"] += 1
568605

569-
# out of thread
570-
job_db.persist(not_started.loc[i : i + 1])
606+
# TODO move to output thread?
607+
with self.df_lock:
608+
job_db.persist(not_started.loc[i : i + 1])
571609
stats["job_db persist"] += 1
572610
total_added += 1
573611

@@ -644,54 +682,35 @@ def _get_jobs_to_launch(self, start_job, not_started: pd.DataFrame, per_backend:
644682
total_added += len(indices_to_add)
645683

646684

647-
def _launch_job(self, df, i, connection, job_id, stats):
648-
"""Helper method for launching jobs
649-
650-
:param start_job:
651-
A callback which will be invoked with the row of the dataframe for which a job should be started.
652-
This callable should return a :py:class:`openeo.rest.job.BatchJob` object.
653-
654-
See also:
655-
`MultiBackendJobManager.run_jobs` for the parameters and return type of this callable
656-
657-
Even though it is called here in `_launch_job` and that is where the constraints
658-
really come from, the public method `run_jobs` needs to document `start_job` anyway,
659-
so let's avoid duplication in the docstrings.
660-
661-
:param df:
662-
DataFrame that specifies the jobs, and tracks the jobs' statuses.
663-
664-
:param i:
665-
index of the job's row in dataframe df
666-
667-
:param backend_name:
668-
name of the backend that will execute the job.
685+
def _launch_job(self, i, connection, job_id, stats):
669686
"""
670-
stats = stats if stats is not None else collections.defaultdict(int)
671-
#TODO move towards bearer token authentication --> see latest push Stefaan
672-
#connection = Connection(url=root_url).authenticate_oidc_access_token(access_token=access_token, provider_id=provider_id)
687+
Helper method for launching jobs.
673688
674-
#TODO remove from launch_job
689+
Instead of updating the dataframe directly, this method obtains the job status
690+
and then puts (i, status) into the job_output_queue.
691+
"""
692+
stats = stats if stats is not None else collections.defaultdict(int)
675693
job = connection.job(job_id)
694+
676695
with ignore_connection_errors(context="get status"):
677696
status = job.status()
678697
stats["job get status"] += 1
679-
df.loc[i, "status"] = status
698+
680699
if status == "created":
681-
# start job if not yet done by callback
682700
try:
683-
connection.post(f"/jobs/{job_id}/results", expected_status=202)
684-
701+
job.start()
685702
stats["job start"] += 1
686-
687-
#TODO remove from launch_job
703+
# After starting, get the updated status.
688704
job = connection.job(job_id)
689-
df.loc[i, "status"] = job.status()
705+
status = job.status()
690706
stats["job get status"] += 1
691707
except OpenEoApiError as e:
692708
_log.error(e)
693-
df.loc[i, "status"] = "start_failed"
709+
status = "start_failed"
694710
stats["job start error"] += 1
711+
712+
# Instead of updating the dataframe, push the outcome to the output queue.
713+
self.job_output_queue.put((i, status))
695714

696715

697716
def on_job_done(self, job: BatchJob, row):
@@ -766,44 +785,6 @@ def _cancel_prolonged_job(self, job: BatchJob, row):
766785
except Exception as e:
767786
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")
768787

769-
def _output_queue_worker(self,
770-
df: Optional[pd.DataFrame] = None,
771-
job_db: Union[str, Path, JobDatabaseInterface, None] = None,):
772-
"""
773-
Dedicated thread that:
774-
- Constantly checks the output queue for job outcomes.
775-
- For each result, it updates the DataFrame (optionally calling job.status())
776-
and persists the updated row via the job database.
777-
"""
778-
while True:
779-
try:
780-
item = self.job_output_queue.get(timeout=self.poll_sleep)
781-
except Empty:
782-
if self.stop_event.is_set():
783-
break
784-
continue
785-
786-
if item is None: # Sentinel to signal shutdown.
787-
break
788-
789-
index, outcome, job = item
790-
791-
# Update the DataFrame.
792-
df.loc[index, "status"] = outcome
793-
if outcome == "started":
794-
try:
795-
status = job.status()
796-
except Exception as e:
797-
_log.error(f"Error retrieving status for index {index}: {e}", exc_info=True)
798-
status = "status_error"
799-
df.loc[index, "status"] = status
800-
801-
# Persist the updated row.
802-
try:
803-
job_db.persist(self.df.loc[index : index + 1])
804-
except Exception as persist_err:
805-
_log.error(f"Error persisting update for index {index}: {persist_err}")
806-
self.job_output_queue.task_done()
807788

808789
def get_job_dir(self, job_id: str) -> Path:
809790
"""Path to directory where job metadata, results and error logs are be saved."""
@@ -828,72 +809,73 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
828809
Tracks status (and stats) of running jobs (in place).
829810
Optionally cancels jobs when running too long.
830811
"""
831-
stats = stats if stats is not None else collections.defaultdict(int)
832-
833-
active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
834-
835-
jobs_done = []
836-
jobs_error = []
837-
jobs_cancel = []
812+
with self.df_lock:
813+
stats = stats if stats is not None else collections.defaultdict(int)
838814

839-
for i in active.index:
840-
job_id = active.loc[i, "id"]
841-
backend_name = active.loc[i, "backend_name"]
842-
previous_status = active.loc[i, "status"]
815+
active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
843816

844-
try:
845-
con = self._get_connection(backend_name)
846-
the_job = con.job(job_id)
847-
job_metadata = the_job.describe()
848-
stats["job describe"] += 1
849-
new_status = job_metadata["status"]
817+
jobs_done = []
818+
jobs_error = []
819+
jobs_cancel = []
850820

851-
_log.info(
852-
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
853-
)
821+
for i in active.index:
822+
job_id = active.loc[i, "id"]
823+
backend_name = active.loc[i, "backend_name"]
824+
previous_status = active.loc[i, "status"]
854825

855-
if new_status == "finished":
856-
stats["job finished"] += 1
857-
jobs_done.append((the_job, active.loc[i]))
826+
try:
827+
con = self._get_connection(backend_name)
828+
the_job = con.job(job_id)
829+
job_metadata = the_job.describe()
830+
stats["job describe"] += 1
831+
new_status = job_metadata["status"]
832+
833+
_log.info(
834+
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
835+
)
858836

859-
if previous_status != "error" and new_status == "error":
860-
stats["job failed"] += 1
861-
jobs_error.append((the_job, active.loc[i]))
837+
if new_status == "finished":
838+
stats["job finished"] += 1
839+
jobs_done.append((the_job, active.loc[i]))
862840

863-
if new_status == "canceled":
864-
stats["job canceled"] += 1
865-
jobs_cancel.append((the_job, active.loc[i]))
841+
if previous_status != "error" and new_status == "error":
842+
stats["job failed"] += 1
843+
jobs_error.append((the_job, active.loc[i]))
866844

867-
if previous_status in {"created", "queued"} and new_status == "running":
868-
stats["job started running"] += 1
869-
active.loc[i, "running_start_time"] = rfc3339.utcnow()
845+
if new_status == "canceled":
846+
stats["job canceled"] += 1
847+
jobs_cancel.append((the_job, active.loc[i]))
870848

871-
if self._cancel_running_job_after and new_status == "running":
872-
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
873-
_log.warning(
874-
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
875-
)
849+
if previous_status in {"created", "queued"} and new_status == "running":
876850
stats["job started running"] += 1
877851
active.loc[i, "running_start_time"] = rfc3339.utcnow()
878852

879-
self._cancel_prolonged_job(the_job, active.loc[i])
853+
if self._cancel_running_job_after and new_status == "running":
854+
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
855+
_log.warning(
856+
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
857+
)
858+
stats["job started running"] += 1
859+
active.loc[i, "running_start_time"] = rfc3339.utcnow()
860+
861+
self._cancel_prolonged_job(the_job, active.loc[i])
880862

881-
active.loc[i, "status"] = new_status
863+
active.loc[i, "status"] = new_status
882864

883-
# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
884-
for key in job_metadata.get("usage", {}).keys():
885-
if key in active.columns:
886-
active.loc[i, key] = _format_usage_stat(job_metadata, key)
887-
if "costs" in job_metadata.keys():
888-
active.loc[i, "costs"] = job_metadata.get("costs")
865+
# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
866+
for key in job_metadata.get("usage", {}).keys():
867+
if key in active.columns:
868+
active.loc[i, key] = _format_usage_stat(job_metadata, key)
869+
if "costs" in job_metadata.keys():
870+
active.loc[i, "costs"] = job_metadata.get("costs")
889871

890-
except OpenEoApiError as e:
891-
# TODO: inspect status code and e.g. differentiate between 4xx/5xx
892-
stats["job tracking error"] += 1
893-
_log.warning(f"Error while tracking status of job {job_id!r} on backend {backend_name}: {e!r}")
872+
except OpenEoApiError as e:
873+
# TODO: inspect status code and e.g. differentiate between 4xx/5xx
874+
stats["job tracking error"] += 1
875+
_log.warning(f"Error while tracking status of job {job_id!r} on backend {backend_name}: {e!r}")
894876

895-
stats["job_db persist"] += 1
896-
job_db.persist(active)
877+
stats["job_db persist"] += 1
878+
job_db.persist(active)
897879

898880
return jobs_done, jobs_error, jobs_cancel
899881

0 commit comments

Comments
 (0)