Skip to content

Commit 6a3a681

Browse files
committed
include locking for persist and stats
1 parent 6d15d08 commit 6a3a681

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010
import warnings
1111
from pathlib import Path
12-
from threading import Thread, Semaphore
12+
from threading import Thread, Semaphore, Lock
1313
from typing import (
1414
Any,
1515
Callable,
@@ -223,6 +223,7 @@ def __init__(
223223
)
224224
self._thread = None
225225
self._max_concurrent_job_launch = 5
226+
self._stats_lock = Lock()
226227

227228
def add_backend(
228229
self,
@@ -561,9 +562,12 @@ def job_worker(i, backend_name):
561562
with semaphore:
562563
try:
563564
self._launch_job(start_job, not_started, i, backend_name, stats)
564-
stats["job launch"] += 1
565-
job_db.persist(not_started.loc[i : i + 1]) # Persist each job as it's launched
566-
stats["job_db persist"] += 1
565+
with self._stats_lock:
566+
stats["job launch"] += 1
567+
568+
job_db.persist(not_started.loc[i : i + 1])
569+
with self._stats_lock:
570+
stats["job_db persist"] += 1
567571
except Exception as e:
568572
_log.error(f"Job launch failed for index {i}: {e}")
569573

tests/extra/job_management/test_job_management.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import re
66
import threading
77
from pathlib import Path
8-
from time import sleep, time
8+
from time import sleep
99
from typing import Callable, Union
1010
from unittest import mock
1111

0 commit comments

Comments
 (0)