@@ -518,7 +518,6 @@ def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[],
518518 stats ["track_statuses" ] += 1
519519
520520 self ._launch_pending_jobs (job_db , start_job , stats )
521-
522521 self ._handle_completed_jobs (stats )
523522
524523
@@ -576,17 +575,20 @@ def job_worker(i, backend_name):
576575 for thread in threads :
577576 thread .join ()
578577
579- def _handle_completed_jobs (self , stats ):
578+ def _handle_completed_jobs (self ,stats ):
580579 """Processes completed, canceled, and errored jobs."""
580+ #TODO downloading will be a blocker, run in seperate threads
581+ for job , row in self .jobs_done :
582+ self .on_job_done (job , row )
583+
581584 for job , row in self .jobs_error :
582585 self .on_job_error (job , row )
583586
584587 for job , row in self .jobs_cancel :
585588 self .on_job_cancel (job , row )
586-
587- #TODO downloading will be a blocker, run in seperate threads
588- for job , row in self .jobs_done :
589- self .on_job_done (job , row )
589+
590+ for job , row in self .jobs_prolonged :
591+ self ._cancel_prolonged_job (job , row )
590592
591593
592594 def _launch_job (self , start_job , df , i , backend_name , stats : Optional [dict ] = None ):
@@ -757,6 +759,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
757759 self .jobs_done = []
758760 self .jobs_error = []
759761 self .jobs_cancel = []
762+ self .jobs_prolonged = []
760763
761764 for i in active .index :
762765 job_id = active .loc [i , "id" ]
@@ -798,7 +801,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
798801 stats ["job started running" ] += 1
799802 active .loc [i , "running_start_time" ] = rfc3339 .utcnow ()
800803
801- self ._cancel_prolonged_job (the_job , active .loc [i ])
804+ #TODO; move this outide of the track_statuses loop, towards, handle completed jobs
805+ self .jobs_prolonged .append ((the_job , active .loc [i ]))
802806
803807 active .loc [i , "status" ] = new_status
804808
0 commit comments