Unverified Commit b15ae2cd authored by Marius van den Beek's avatar Marius van den Beek Committed by GitHub
Browse files

Merge pull request #20522 from mvdbeek/improve_workflow_scheduling_iteration_loop

[25.0] Improve workflow monitor loop times
parents 1e2737a4 f251d841
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -9000,6 +9000,13 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl
    states = InvocationState
    non_terminal_states = [states.NEW, states.READY]

    def get_last_workflow_invocation_step_update_time(self) -> Optional[datetime]:
        session = required_object_session(self)
        stmt = select(func.max(WorkflowInvocationStep.update_time)).where(
            WorkflowInvocationStep.workflow_invocation_id == self.id
        )
        return session.execute(stmt).scalar_one_or_none()

    def create_subworkflow_invocation_for_step(self, step):
        assert step.type == "subworkflow"
        subworkflow_invocation = WorkflowInvocation()
+41 −3
Original line number Diff line number Diff line
import os
from datetime import (
    datetime,
    timedelta,
)
from functools import partial
from typing import Optional
from typing import (
    Dict,
    Optional,
)

from sqlalchemy.orm import Session

@@ -30,6 +37,7 @@ log = get_logger(__name__)

DEFAULT_SCHEDULER_ID = "default"  # well actually this should be called DEFAULT_DEFAULT_SCHEDULER_ID...
DEFAULT_SCHEDULER_PLUGIN_TYPE = "core"
DEFAULT_SCHEDULER_BACKFILL_SECONDS = int(os.getenv("GALAXY_SCHEDULER_BACKFILL_SECONDS", 300))

EXCEPTION_MESSAGE_SHUTDOWN = "Exception raised while attempting to shutdown workflow scheduler."
EXCEPTION_MESSAGE_NO_SCHEDULERS = "Failed to defined workflow schedulers - no workflow schedulers defined."
@@ -296,6 +304,8 @@ class WorkflowRequestMonitor(Monitors):
            name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config
        )
        self.invocation_grabber = None
        self.update_time_tracking_dict: Dict[int, datetime] = {}
        self.timedelta = timedelta(seconds=DEFAULT_SCHEDULER_BACKFILL_SECONDS)
        self_handler_tags = set(self.app.job_config.self_handler_tags)
        self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
        handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method(
@@ -310,6 +320,29 @@ class WorkflowRequestMonitor(Monitors):
                handler_tags=self_handler_tags,
            )

    def ready_to_schedule_more(self, invocation: model.WorkflowInvocation):
        # Improve reactivity of scheduling using the history update_time as a heuristic.
        # If there wasn't a change in the history we're unlikely to be able to make more progress.
        if invocation.id not in self.update_time_tracking_dict:
            return True
        else:
            last_schedule_time = self.update_time_tracking_dict[invocation.id]
            last_history_update_time = invocation.history.update_time
            do_schedule = last_history_update_time > last_schedule_time
            if not do_schedule and (
                invocation_step_update_time := invocation.get_last_workflow_invocation_step_update_time()
            ):
                do_schedule = invocation_step_update_time > last_schedule_time
            if not do_schedule and (datetime.now() - last_schedule_time) > self.timedelta:
                # If we haven't scheduled in a while, schedule anyway.
                log.debug(
                    "Scheduling workflow invocation [%s] after %s seconds without scheduling.",
                    invocation.id,
                    (datetime.now() - last_schedule_time).total_seconds(),
                )
                do_schedule = True
            return do_schedule

    def __monitor(self):
        to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers
        while self.monitor_running:
@@ -393,9 +426,11 @@ class WorkflowRequestMonitor(Monitors):
                    workflow_invocation.cancel_invocation_steps()
                    workflow_invocation.mark_cancelled()
                    session.commit()
                    self.update_time_tracking_dict.pop(invocation_id, None)
                    return False

                if not workflow_invocation or not workflow_invocation.active:
                    self.update_time_tracking_dict.pop(invocation_id, None)
                    return False

                # This ensures we're only ever working on the 'first' active
@@ -405,9 +440,12 @@ class WorkflowRequestMonitor(Monitors):
                    for i in workflow_invocation.history.workflow_invocations:
                        if i.active and i.id < workflow_invocation.id:
                            return False
                if self.ready_to_schedule_more(workflow_invocation):
                    self.update_time_tracking_dict[invocation_id] = datetime.now()
                    workflow_scheduler.schedule(workflow_invocation)
                log.debug("Workflow invocation [%s] scheduled", workflow_invocation.id)
                    log.debug("Workflow invocation [%s] scheduled", invocation_id)
            except Exception:
                self.update_time_tracking_dict.pop(invocation_id, None)
                # TODO: eventually fail this - or fail it right away?
                log.exception("Exception raised while attempting to schedule workflow request.")
                return False