Unverified Commit e8dac6be authored by mvdbeek's avatar mvdbeek
Browse files

Improve workflow monitor loop times

parent e95a5590
Loading
Loading
Loading
Loading
+23 −3
Original line number Diff line number Diff line
import os
from datetime import datetime
from functools import partial
from typing import Optional
from typing import (
    Dict,
    Optional,
)

from sqlalchemy.orm import Session

@@ -296,6 +300,7 @@ class WorkflowRequestMonitor(Monitors):
            name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config
        )
        self.invocation_grabber = None
        self.update_time_tracking_dict: Dict[int, datetime] = {}
        self_handler_tags = set(self.app.job_config.self_handler_tags)
        self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
        handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method(
@@ -310,6 +315,16 @@ class WorkflowRequestMonitor(Monitors):
                handler_tags=self_handler_tags,
            )

    def ready_to_schedule_more(self, invocation: model.WorkflowInvocation):
        # Improve reactivity of scheduling using the history update_time as a heuristic.
        # If there wasn't a change in the history we're unlikely to be able to make more progress.
        if invocation.id not in self.update_time_tracking_dict:
            return True
        else:
            last_schedule_time = self.update_time_tracking_dict[invocation.id]
            last_history_update_time = invocation.history.update_time
            return last_history_update_time > last_schedule_time

    def __monitor(self):
        to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers
        while self.monitor_running:
@@ -393,9 +408,11 @@ class WorkflowRequestMonitor(Monitors):
                    workflow_invocation.cancel_invocation_steps()
                    workflow_invocation.mark_cancelled()
                    session.commit()
                    self.update_time_tracking_dict.pop(invocation_id, None)
                    return False

                if not workflow_invocation or not workflow_invocation.active:
                    self.update_time_tracking_dict.pop(invocation_id, None)
                    return False

                # This ensures we're only ever working on the 'first' active
@@ -405,9 +422,12 @@ class WorkflowRequestMonitor(Monitors):
                    for i in workflow_invocation.history.workflow_invocations:
                        if i.active and i.id < workflow_invocation.id:
                            return False
                if self.ready_to_schedule_more(workflow_invocation):
                    self.update_time_tracking_dict[invocation_id] = datetime.now()
                    workflow_scheduler.schedule(workflow_invocation)
                log.debug("Workflow invocation [%s] scheduled", workflow_invocation.id)
                    log.debug("Workflow invocation [%s] scheduled", invocation_id)
            except Exception:
                self.update_time_tracking_dict.pop(invocation_id, None)
                # TODO: eventually fail this - or fail it right away?
                log.exception("Exception raised while attempting to schedule workflow request.")
                return False