Skip to content

Commit b69a760

Browse files
committed
db locking
1 parent 6a3a681 commit b69a760

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ def __init__(
224224
self._thread = None
225225
self._max_concurrent_job_launch = 5
226226
self._stats_lock = Lock()
227+
self._db_lock = Lock()
227228

228229
def add_backend(
229230
self,
@@ -516,7 +517,8 @@ def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[],
516517

517518
with ignore_connection_errors(context="get statuses"):
518519
self._track_statuses(job_db, stats=stats)
519-
stats["track_statuses"] += 1
520+
with self._stats_lock:
521+
stats["track_statuses"] += 1
520522

521523
self._launch_pending_jobs(job_db, start_job, stats)
522524
self._handle_completed_jobs(stats)
@@ -527,7 +529,8 @@ def _launch_pending_jobs(self, job_db, start_job, stats):
527529
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
528530
if not not_started.empty:
529531
running = job_db.get_by_status(statuses=["created", "queued", "running"])
530-
stats["job_db get_by_status"] += 1
532+
with self._stats_lock:
533+
stats["job_db get_by_status"] += 1
531534

532535
per_backend = running.groupby("backend_name").size().to_dict()
533536
_log.info(f"Running per backend: {per_backend}")
@@ -562,11 +565,12 @@ def job_worker(i, backend_name):
562565
with semaphore:
563566
try:
564567
self._launch_job(start_job, not_started, i, backend_name, stats)
568+
569+
with self._db_lock:
570+
job_db.persist(not_started.loc[i : i + 1])
571+
565572
with self._stats_lock:
566573
stats["job launch"] += 1
567-
568-
job_db.persist(not_started.loc[i : i + 1])
569-
with self._stats_lock:
570574
stats["job_db persist"] += 1
571575
except Exception as e:
572576
_log.error(f"Job launch failed for index {i}: {e}")

0 commit comments

Comments
 (0)