Loading lib/galaxy/workflow/scheduling_manager.py +15 −2 Original line number Diff line number Diff line import os from datetime import datetime from datetime import ( datetime, timedelta, ) from functools import partial from typing import ( Dict, Loading Loading @@ -34,6 +37,7 @@ log = get_logger(__name__) DEFAULT_SCHEDULER_ID = "default" # well actually this should be called DEFAULT_DEFAULT_SCHEDULER_ID... DEFAULT_SCHEDULER_PLUGIN_TYPE = "core" DEFAULT_SCHEDULER_BACKFILL_SECONDS = int(os.getenv("GALAXY_SCHEDULER_BACKFILL_SECONDS", 300)) EXCEPTION_MESSAGE_SHUTDOWN = "Exception raised while attempting to shutdown workflow scheduler." EXCEPTION_MESSAGE_NO_SCHEDULERS = "Failed to defined workflow schedulers - no workflow schedulers defined." Loading Loading @@ -301,6 +305,7 @@ class WorkflowRequestMonitor(Monitors): ) self.invocation_grabber = None self.update_time_tracking_dict: Dict[int, datetime] = {} self.timedelta = timedelta(seconds=DEFAULT_SCHEDULER_BACKFILL_SECONDS) self_handler_tags = set(self.app.job_config.self_handler_tags) self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id) handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method( Loading @@ -327,7 +332,15 @@ class WorkflowRequestMonitor(Monitors): if not do_schedule and ( invocation_step_update_time := invocation.get_last_workflow_invocation_step_update_time() ): return invocation_step_update_time > last_schedule_time do_schedule = invocation_step_update_time > last_schedule_time if not do_schedule and (datetime.now() - last_schedule_time) > self.timedelta: # If we haven't scheduled in a while, schedule anyway. log.debug( "Scheduling workflow invocation [%s] after %s seconds without scheduling.", invocation.id, (datetime.now() - last_schedule_time).total_seconds(), ) do_schedule = True return do_schedule def __monitor(self): Loading Loading
lib/galaxy/workflow/scheduling_manager.py +15 −2 Original line number Diff line number Diff line import os from datetime import datetime from datetime import ( datetime, timedelta, ) from functools import partial from typing import ( Dict, Loading Loading @@ -34,6 +37,7 @@ log = get_logger(__name__) DEFAULT_SCHEDULER_ID = "default" # well actually this should be called DEFAULT_DEFAULT_SCHEDULER_ID... DEFAULT_SCHEDULER_PLUGIN_TYPE = "core" DEFAULT_SCHEDULER_BACKFILL_SECONDS = int(os.getenv("GALAXY_SCHEDULER_BACKFILL_SECONDS", 300)) EXCEPTION_MESSAGE_SHUTDOWN = "Exception raised while attempting to shutdown workflow scheduler." EXCEPTION_MESSAGE_NO_SCHEDULERS = "Failed to defined workflow schedulers - no workflow schedulers defined." Loading Loading @@ -301,6 +305,7 @@ class WorkflowRequestMonitor(Monitors): ) self.invocation_grabber = None self.update_time_tracking_dict: Dict[int, datetime] = {} self.timedelta = timedelta(seconds=DEFAULT_SCHEDULER_BACKFILL_SECONDS) self_handler_tags = set(self.app.job_config.self_handler_tags) self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id) handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method( Loading @@ -327,7 +332,15 @@ class WorkflowRequestMonitor(Monitors): if not do_schedule and ( invocation_step_update_time := invocation.get_last_workflow_invocation_step_update_time() ): return invocation_step_update_time > last_schedule_time do_schedule = invocation_step_update_time > last_schedule_time if not do_schedule and (datetime.now() - last_schedule_time) > self.timedelta: # If we haven't scheduled in a while, schedule anyway. log.debug( "Scheduling workflow invocation [%s] after %s seconds without scheduling.", invocation.id, (datetime.now() - last_schedule_time).total_seconds(), ) do_schedule = True return do_schedule def __monitor(self): Loading