Skip to content

Commit 87a32e5

Browse files
committed
re-adding len(to_keep)
1 parent 3a215b5 commit 87a32e5

File tree

3 files changed

+38
-36
lines changed

3 files changed

+38
-36
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):
403403
404404
.. versionadded:: 0.32.0
405405
"""
406-
if self._worker_pool is not None or self._worker_pool.number_pending_tasks() > 0:
406+
if self._worker_pool is not None:
407407
self._worker_pool.shutdown()
408408
self._worker_pool = None
409409

@@ -519,7 +519,7 @@ def run_jobs(
519519
statuses=["not_started", "created", "queued_for_start", "queued", "running"]
520520
).values()) > 0
521521

522-
or (self._worker_pool.number_pending_tasks() > 0)
522+
or (self._worker_pool.number_pending_tasks() > 0) #avoid stopping the manager too early if there are still tasks being processed
523523

524524
):
525525
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
@@ -704,7 +704,7 @@ def _process_threadworker_updates(
704704
:param stats: Dictionary accumulating statistic counters
705705
"""
706706
# Retrieve completed task results immediately
707-
results = worker_pool.process_futures(timeout=0)
707+
results, _ = worker_pool.process_futures(timeout=0)
708708

709709
# Collect update dicts
710710
updates: List[Dict[str, Any]] = []

openeo/extra/job_management/_thread_worker.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ class _TaskThreadPool:
194194
195195
:param max_workers:
196196
Maximum number of concurrent threads to use for execution.
197-
Defaults to 1.
197+
Defaults to 2.
198198
"""
199199

200-
def __init__(self, max_workers: int = 1, name: str = 'default'):
200+
def __init__(self, max_workers: int = 2, name: str = 'default'):
201201
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
202202
self._future_task_pairs: List[Tuple[concurrent.futures.Future, Task]] = []
203203
self._name = name
@@ -251,10 +251,10 @@ def process_futures(self, timeout: Union[float, None] = 0) -> Tuple[List[_TaskRe
251251
_log.info("process_futures: %d tasks done, %d tasks remaining", len(results), len(to_keep))
252252

253253
self._future_task_pairs = to_keep
254-
return results
254+
return results, len(to_keep)
255255

256256
def number_pending_tasks(self) -> int:
257-
"""Return the number of tasks that are still pending (not completed)."""
257+
"""Approximation of the number of tasks is used to avoid stopping the job manager too early."""
258258
return len(self._future_task_pairs)
259259

260260
def shutdown(self) -> None:
@@ -305,13 +305,15 @@ def process_futures(self, timeout: Union[float, None] = 0) -> Tuple[List[_TaskRe
305305
Returns: (all_results, dict of remaining tasks per pool)
306306
"""
307307
all_results = []
308+
all_remaining = {}
308309

309310
for pool_name, pool in self._pools.items():
310-
results = pool.process_futures(timeout)
311+
results, remaining = pool.process_futures(timeout)
311312
all_results.extend(results)
312-
313-
return all_results
314-
313+
all_remaining[pool_name] = remaining
314+
315+
return all_results, all_remaining
316+
315317
def number_pending_tasks(self, pool_name: Optional[str] = None) -> int:
316318
if pool_name:
317319
pool = self._pools.get(pool_name)

tests/extra/job_management/test_thread_worker.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,12 @@ def worker_pool(self) -> Iterator[_TaskThreadPool]:
221221
pool.shutdown()
222222

223223
def test_no_tasks(self, worker_pool):
224-
results = worker_pool.process_futures(timeout=10)
224+
results, _ = worker_pool.process_futures(timeout=10)
225225
assert results == []
226226

227227
def test_submit_and_process(self, worker_pool):
228228
worker_pool.submit_task(DummyTask(job_id="j-123", df_idx=0))
229-
results = worker_pool.process_futures(timeout=10)
229+
results, _ = worker_pool.process_futures(timeout=10)
230230
assert results == [
231231
_TaskResult(job_id="j-123", df_idx=0, db_update={"status": "dummified"}, stats_update={"dummy": 1}),
232232
]
@@ -235,14 +235,14 @@ def test_submit_and_process_zero_timeout(self, worker_pool):
235235
worker_pool.submit_task(DummyTask(job_id="j-123", df_idx=0))
236236
# Trigger context switch
237237
time.sleep(0.1)
238-
results = worker_pool.process_futures(timeout=0)
238+
results, _ = worker_pool.process_futures(timeout=0)
239239
assert results == [
240240
_TaskResult(job_id="j-123", df_idx=0, db_update={"status": "dummified"}, stats_update={"dummy": 1}),
241241
]
242242

243243
def test_submit_and_process_with_error(self, worker_pool):
244244
worker_pool.submit_task(DummyTask(job_id="j-666", df_idx=0))
245-
results = worker_pool.process_futures(timeout=10)
245+
results, _ = worker_pool.process_futures(timeout=10)
246246
assert results == [
247247
_TaskResult(
248248
job_id="j-666",
@@ -255,13 +255,13 @@ def test_submit_and_process_with_error(self, worker_pool):
255255

256256
def test_submit_and_process_iterative(self, worker_pool):
257257
worker_pool.submit_task(NopTask(job_id="j-1", df_idx=1))
258-
results = worker_pool.process_futures(timeout=1)
258+
results, _ = worker_pool.process_futures(timeout=1)
259259
assert results == [_TaskResult(job_id="j-1", df_idx=1)]
260260

261261
# Add some more
262262
worker_pool.submit_task(NopTask(job_id="j-22", df_idx=22))
263263
worker_pool.submit_task(NopTask(job_id="j-222", df_idx=222))
264-
results = worker_pool.process_futures(timeout=1)
264+
results, _ = worker_pool.process_futures(timeout=1)
265265
assert results == [_TaskResult(job_id="j-22", df_idx=22), _TaskResult(job_id="j-222", df_idx=222)]
266266

267267
def test_submit_multiple_simple(self, worker_pool):
@@ -270,7 +270,7 @@ def test_submit_multiple_simple(self, worker_pool):
270270
worker_pool.submit_task(NopTask(job_id=f"j-{j}", df_idx=j))
271271

272272
# Process all of them (non-zero timeout, which should be plenty of time for all of them to finish)
273-
results = worker_pool.process_futures(timeout=1)
273+
results, _ = worker_pool.process_futures(timeout=1)
274274
expected = [_TaskResult(job_id=f"j-{j}", df_idx=j) for j in range(5)]
275275
assert sorted(results, key=lambda r: r.job_id) == expected
276276

@@ -291,24 +291,24 @@ def test_submit_multiple_blocking_and_failing(self, worker_pool):
291291
)
292292

293293
# Initial state: nothing happened yet
294-
results = worker_pool.process_futures(timeout=0)
294+
results, _ = worker_pool.process_futures(timeout=0)
295295
assert results == []
296296

297297
# No changes even after timeout
298-
results = worker_pool.process_futures(timeout=0.1)
298+
results, _ = worker_pool.process_futures(timeout=0.1)
299299
assert results == []
300300

301301
# Set one event and wait for corresponding result
302302
events[0].set()
303-
results = worker_pool.process_futures(timeout=0.1)
303+
results, _ = worker_pool.process_futures(timeout=0.1)
304304
assert results == [
305305
_TaskResult(job_id="j-0", df_idx=0, db_update={"status": "all fine"}),
306306
]
307307

308308
# Release all but one event
309309
for j in range(n - 1):
310310
events[j].set()
311-
results = worker_pool.process_futures(timeout=0.1)
311+
results, _ = worker_pool.process_futures(timeout=0.1)
312312
assert results == [
313313
_TaskResult(job_id="j-1", df_idx=1, db_update={"status": "all fine"}),
314314
_TaskResult(job_id="j-2", df_idx=2, db_update={"status": "all fine"}),
@@ -323,15 +323,15 @@ def test_submit_multiple_blocking_and_failing(self, worker_pool):
323323
# Release all events
324324
for j in range(n):
325325
events[j].set()
326-
results = worker_pool.process_futures(timeout=0.1)
326+
results, _ = worker_pool.process_futures(timeout=0.1)
327327
assert results == [
328328
_TaskResult(job_id="j-4", df_idx=4, db_update={"status": "all fine"}),
329329
]
330330

331331
def test_shutdown(self, worker_pool):
332332
# Before shutdown
333333
worker_pool.submit_task(NopTask(job_id="j-123", df_idx=0))
334-
results = worker_pool.process_futures(timeout=0.1)
334+
results, _ = worker_pool.process_futures(timeout=0.1)
335335
assert results == [_TaskResult(job_id="j-123", df_idx=0)]
336336

337337
worker_pool.shutdown()
@@ -346,7 +346,7 @@ def test_job_start_task(self, worker_pool, dummy_backend, caplog):
346346
task = _JobStartTask(job_id=job.job_id, df_idx=0, root_url=dummy_backend.connection.root_url, bearer_token=None)
347347
worker_pool.submit_task(task)
348348

349-
results = worker_pool.process_futures(timeout=1)
349+
results, _ = worker_pool.process_futures(timeout=1)
350350
assert results == [
351351
_TaskResult(
352352
job_id="job-000",
@@ -365,7 +365,7 @@ def test_job_start_task_failure(self, worker_pool, dummy_backend, caplog):
365365
task = _JobStartTask(job_id=job.job_id, df_idx=0, root_url=dummy_backend.connection.root_url, bearer_token=None)
366366
worker_pool.submit_task(task)
367367

368-
results = worker_pool.process_futures(timeout=1)
368+
results, _ = worker_pool.process_futures(timeout=1)
369369
assert results == [
370370
_TaskResult(
371371
job_id="job-000", df_idx=0, db_update={"status": "start_failed"}, stats_update={"start_job error": 1}
@@ -433,7 +433,7 @@ def test_submit_task_creates_pool(self, thread_pool):
433433
assert "default" in thread_pool._pools
434434

435435
# Process to complete the task
436-
results = thread_pool.process_futures(timeout=0.1)
436+
results, _ = thread_pool.process_futures(timeout=0.1)
437437
assert len(results) == 1
438438
assert results[0].job_id == "j-1"
439439

@@ -472,7 +472,7 @@ def test_submit_multiple_task_types(self, thread_pool):
472472

473473
def test_process_futures_updates_empty(self, thread_pool):
474474
"""Test process futures with no pools."""
475-
results = thread_pool.process_futures(timeout=0)
475+
results, _ = thread_pool.process_futures(timeout=0)
476476
assert results == []
477477

478478
def test_process_futures_updates_multiple_pools(self, thread_pool):
@@ -482,7 +482,7 @@ def test_process_futures_updates_multiple_pools(self, thread_pool):
482482
thread_pool.submit_task(NopTask(job_id="j-2", df_idx=2)) # NopTask pool
483483
thread_pool.submit_task(DummyTask(job_id="j-3", df_idx=3)) # DummyTask pool
484484

485-
results = thread_pool.process_futures(timeout=0.1)
485+
results, _ = thread_pool.process_futures(timeout=0.1)
486486

487487
assert len(results) == 3
488488

@@ -508,7 +508,7 @@ def test_process_futures_updates_partial_completion(self):
508508
pool.submit_task(quick_task, "quick") # NopTask pool
509509

510510
# Process with timeout=0 - only quick task should complete
511-
results = pool.process_futures(timeout=0)
511+
results, _ = pool.process_futures(timeout=0)
512512

513513
# Only quick task completed
514514
assert len(results) == 1
@@ -520,7 +520,7 @@ def test_process_futures_updates_partial_completion(self):
520520

521521
# Release blocking task and process again
522522
event.set()
523-
results2 = pool.process_futures(timeout=0.1)
523+
results2, _ = pool.process_futures(timeout=0.1)
524524

525525
assert len(results2) == 1
526526
assert results2[0].job_id == "j-block"
@@ -609,7 +609,7 @@ def execute(self) -> _TaskResult:
609609
assert pool.number_pending_tasks() == 1
610610

611611
# Process it
612-
results = pool.process_futures(timeout=0.1)
612+
results, _ = pool.process_futures(timeout=0.1)
613613
assert len(results) == 1
614614
assert results[0].job_id == "j-1"
615615

@@ -633,7 +633,7 @@ def submit_tasks(start_idx: int):
633633
assert thread_pool.number_pending_tasks() == 15
634634

635635
# Process them all
636-
results = thread_pool.process_futures(timeout=0.5)
636+
results, _ = thread_pool.process_futures(timeout=0.5)
637637

638638
assert len(results) == 15
639639

@@ -661,7 +661,7 @@ def test_pool_parallelism_with_blocking_tasks(self):
661661
for event in events:
662662
event.set()
663663

664-
results = pool.process_futures(timeout=0.5)
664+
results, _ = pool.process_futures(timeout=0.5)
665665
assert len(results) == 5
666666

667667
for result in results:
@@ -675,7 +675,7 @@ def test_task_with_error_handling(self, thread_pool):
675675
thread_pool.submit_task(DummyTask(job_id="j-666", df_idx=0))
676676

677677
# Process it
678-
results = thread_pool.process_futures(timeout=0.1)
678+
results, _ = thread_pool.process_futures(timeout=0.1)
679679

680680
# Should get error result
681681
assert len(results) == 1
@@ -692,7 +692,7 @@ def test_mixed_success_and_error_tasks(self, thread_pool):
692692
thread_pool.submit_task(DummyTask(job_id="j-3", df_idx=3)) # Success
693693

694694
# Process all
695-
results = thread_pool.process_futures(timeout=0.1)
695+
results, _ = thread_pool.process_futures(timeout=0.1)
696696

697697
# Should get 3 results
698698
assert len(results) == 3

0 commit comments

Comments
 (0)