Skip to content

Commit 62fad7d

Browse files
committed
queing
1 parent 617dfd9 commit 62fad7d

File tree

1 file changed

+32
-12
lines changed

1 file changed

+32
-12
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import warnings
1111
from pathlib import Path
1212
from threading import Thread, Semaphore, Lock
13+
from queue import Queue
14+
1315

1416
from typing import (
1517
Any,
@@ -558,29 +560,47 @@ def _get_jobs_to_launch(self, not_started, per_backend):
558560

559561
return jobs_to_add
560562

561-
def _run_job_threads(self, jobs_to_add, start_job, not_started, stats, job_db):
562-
"""Manages threading for job launching."""
563-
semaphore = Semaphore(self._max_concurrent_job_launch)
564-
threads = []
563+
def _run_job_threads(
564+
self,
565+
jobs_to_add: list,
566+
start_job: Callable[[], BatchJob],
567+
not_started: pd.DataFrame,
568+
stats: Optional[dict],
569+
job_db: JobDatabaseInterface
570+
) -> None:
571+
"""Manages threading for job launching using a queue and thread pool."""
572+
job_queue = Queue()
573+
574+
# Fill the queue with jobs to launch
575+
for i, backend_name in jobs_to_add:
576+
job_queue.put((i, backend_name))
565577

566-
def job_worker(i, backend_name):
567-
with semaphore:
578+
def job_worker():
579+
while not job_queue.empty():
580+
i, backend_name = job_queue.get()
568581
try:
582+
# Process job
569583
self._launch_job(start_job, not_started, i, backend_name, stats)
570584
stats["job launch"] += 1
571-
585+
572586
with self._db_lock:
573-
job_db.persist(not_started.loc[i : i + 1])
574-
575-
stats["job_db persist"] += 1
587+
job_db.persist(not_started.loc[i: i + 1])
588+
stats["job_db persist"] += 1
576589
except Exception as e:
577590
_log.error(f"Job launch failed for index {i}: {e}")
591+
finally:
592+
job_queue.task_done()
578593

579-
for i, backend_name in jobs_to_add:
580-
thread = Thread(target=job_worker, args=(i, backend_name))
594+
# Create a pool of threads that work concurrently
595+
num_threads = min(len(jobs_to_add), self._max_concurrent_job_launch)
596+
threads = []
597+
598+
for _ in range(num_threads):
599+
thread = Thread(target=job_worker)
581600
thread.start()
582601
threads.append(thread)
583602

603+
# Wait for all jobs in the queue to be processed
584604
for thread in threads:
585605
thread.join()
586606

0 commit comments

Comments
 (0)