Unverified Commit 1d135f4b authored by mvdbeek's avatar mvdbeek
Browse files

Also consider workflow invocation step update time

parent e8dac6be
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -8995,6 +8995,13 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl
    states = InvocationState
    non_terminal_states = [states.NEW, states.READY]

    def get_last_workflow_invocation_step_update_time(self) -> Optional[datetime]:
        session = required_object_session(self)
        stmt = select(func.max(WorkflowInvocationStep.update_time)).where(
            WorkflowInvocationStep.workflow_invocation_id == self.id
        )
        return session.execute(stmt).scalar_one_or_none()

    def create_subworkflow_invocation_for_step(self, step):
        assert step.type == "subworkflow"
        subworkflow_invocation = WorkflowInvocation()
+6 −1
Original line number Diff line number Diff line
@@ -323,7 +323,12 @@ class WorkflowRequestMonitor(Monitors):
        else:
            last_schedule_time = self.update_time_tracking_dict[invocation.id]
            last_history_update_time = invocation.history.update_time
            return last_history_update_time > last_schedule_time
            do_schedule = last_history_update_time > last_schedule_time
            if not do_schedule and (
                invocation_step_update_time := invocation.get_last_workflow_invocation_step_update_time()
            ):
                return invocation_step_update_time > last_schedule_time
            return do_schedule

    def __monitor(self):
        to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers