1919import subprocess
2020import sys
2121import tempfile
22+ import time
2223import zipfile
2324from typing import Any
2425from urllib .parse import urldefrag
2526
2627from celery .exceptions import SoftTimeLimitExceeded # type: ignore
28+ import celery .states # type: ignore
2729
2830import toil .server .wes .amazon_wes_utils as amazon_wes_utils
2931from toil .common import Toil
3032from toil .jobStores .utils import generate_locator
3133from toil .server .celery_app import celery
3234from toil .server .utils import (
3335 MAX_CANCELING_SECONDS ,
36+ TERMINAL_STATES ,
3437 WorkflowStateMachine ,
3538 connect_to_workflow_state_store ,
3639 download_file_from_internet ,
4447
4548# How many seconds should we give a Toil workflow to gracefully shut down
4649# before we kill it?
47- # Ought to be long enough to let it clean up its job store, but shorter than
48- # our patience for CANCELING WES workflows to time out to CANCELED.
49- WAIT_FOR_DEATH_TIMEOUT = MAX_CANCELING_SECONDS - 15
50+ # Ought to be long enough to let it clean up its job store.
51+ WAIT_FOR_DEATH_TIMEOUT = 20
5052
5153
5254class ToilWorkflowRunner :
@@ -456,9 +458,13 @@ def run_wes_task(
456458 logger .info (f"Fetching output files." )
457459 runner .write_output_files ()
458460 except (KeyboardInterrupt , SystemExit , SoftTimeLimitExceeded ):
459- # We canceled the workflow run
460- logger .info ("Canceling the workflow" )
461- runner .state_machine .send_canceled ()
461+ # We canceled the workflow run after setup.
462+ # We can confirm cancellation, but only if we haven't already declared
463+ # completion.
464+ state = runner .get_state ()
465+ if state not in TERMINAL_STATES :
466+ logger .info ("Canceling the workflow" )
467+ runner .state_machine .send_canceled ()
462468 except Exception :
463469 # The workflow run broke. We still count as the executor here.
464470 logger .exception ("Running Toil produced an exception." )
@@ -474,8 +480,9 @@ def run_wes_task(
474480
475481def cancel_run (task_id : str ) -> None :
476482 """
477- Send a SIGTERM signal to the process that is running task_id.
483+ Send a signal to the process that is running Celery task task_id.
478484 """
485+ # Celery uses SIGUSR1 for raising SoftTimeLimitExceeded.
479486 celery .control .terminate (task_id , signal = "SIGUSR1" )
480487
481488
@@ -484,6 +491,9 @@ class TaskRunner:
484491 Abstraction over the Celery API. Runs our run_wes task and allows canceling it.
485492
486493 We can swap this out in the server to allow testing without Celery.
494+
495+ Note that this is not responsible for acting on or having events for things
496+ like failed Celery tasks.
487497 """
488498
489499 @staticmethod
@@ -505,12 +515,27 @@ def cancel(task_id: str) -> None:
505515 @staticmethod
506516 def is_ok (task_id : str ) -> bool :
507517 """
508- Make sure that the task running system is working for the given task.
509- If the task system has detected an internal failure, return False.
518+ Returns True if the task has not yet failed, and False otherwise.
519+
520+ Returns True if the task was successfully canceled.
521+
522+ If False, the task is also not live.
510523 """
511- # Nothing to do for Celery
524+ # Poll Celery about the task. See <https://stackoverflow.com/a/38287835>
525+ result = celery .result .AsyncResult (task_id )
526+ if result .status == celery .states .FAILURE :
527+ return False
512528 return True
513529
530+ @staticmethod
531+ def is_live (task_id : str ) -> bool :
532+ """
533+ Returns True if the task has not yet stopped, and False otherwise.
534+ """
535+ result = celery .result .AsyncResult (task_id )
536+ # Celery "ready" means the result is as available as it is getting
537+ return result .status not in celery .states .READY_STATES
538+
514539
515540# If Celery can't be set up, we can just use this fake version instead.
516541
@@ -520,16 +545,23 @@ class MultiprocessingTaskRunner(TaskRunner):
520545 Version of TaskRunner that just runs tasks with Multiprocessing.
521546
522547 Can't use threading because there's no way to send a cancel signal or
523- exception to a Python thread, if loops in the task (i.e.
524- ToilWorkflowRunner) don 't poll for it.
548+ exception to a Python thread, if loops and the task (i.e.
549+ ToilWorkflowRunner) doesn 't poll for it.
525550 """
526551
527552 _id_to_process : dict [str , multiprocessing .Process ] = {}
528553 _id_to_log : dict [str , str ] = {}
529554
555+ # For testing, we can delay task setup by this many seconds.
556+ # This needs to be smuggled into the multiprocessing child process because
557+ # it won't inherit any replacements and has its own globals/class scopes
558+ setup_delay = 0
559+
530560 @staticmethod
531561 def set_up_and_run_task (
532- output_path : str , args : tuple [str , str , str , dict [str , Any ], list [str ]]
562+ output_path : str ,
563+ args : tuple [str , str , str , dict [str , Any ], list [str ]],
564+ setup_delay : int
533565 ) -> None :
534566 """
535567 Set up logging for the process into the given file and then call
@@ -539,6 +571,8 @@ def set_up_and_run_task(
539571 the process crashes, the caller must clean up the log.
540572 """
541573
574+ time .sleep (setup_delay )
575+
542576 # Multiprocessing and the server manage to hide actual task output from
543577 # the tests. Logging messages will appear in pytest's "live" log but
544578 # not in the captured log. And replacing sys.stdout and sys.stderr
@@ -604,7 +638,7 @@ def run(
604638 )
605639
606640 cls ._id_to_process [task_id ] = multiprocessing .Process (
607- target = cls .set_up_and_run_task , args = (path , args )
641+ target = cls .set_up_and_run_task , args = (path , args , cls . setup_delay )
608642 )
609643 cls ._id_to_process [task_id ].start ()
610644
@@ -622,8 +656,11 @@ def cancel(cls, task_id: str) -> None:
622656 @classmethod
623657 def is_ok (cls , task_id : str ) -> bool :
624658 """
625- Make sure that the task running system is working for the given task.
626- If the task system has detected an internal failure, return False.
659+ Return True if the task has not yet failed, and False otherwise.
660+
661+ Returns True if the task was successfully canceled.
662+
663+ If False, the task is also not live.
627664 """
628665
629666 process = cls ._id_to_process .get (task_id )
@@ -639,7 +676,7 @@ def is_ok(cls, task_id: str) -> bool:
639676 process .exitcode is not None
640677 and process .exitcode not in ACCEPTABLE_EXIT_CODES
641678 ):
642- # Something went wring in the task and it couldn't handle it.
679+ # Something went wrong in the task and it couldn't handle it.
643680 logger .error (
644681 "Process for running %s failed with code %s" , task_id , process .exitcode
645682 )
@@ -655,3 +692,15 @@ def is_ok(cls, task_id: str) -> bool:
655692 return False
656693
657694 return True
695+
696+ @classmethod
697+ def is_live (cls , task_id : str ) -> bool :
698+ """
699+ Returns True if the task has not yet stopped, and False otherwise.
700+ """
701+ process = cls ._id_to_process .get (task_id )
702+ if process is None :
703+ # Never heard of this task, so it's probably in the process of
704+ # getting made
705+ return True
706+ return process .exitcode is None
0 commit comments