Unverified Commit 656d5a10 authored by mvdbeek's avatar mvdbeek
Browse files

Don't create workflow outputs to recover input parameter outputs

Just eval the code that determines the outputs of the step again.
Avoids modifying the workflow as it executes, which is bad.
parent 3118cd2d
Loading
Loading
Loading
Loading
+31 −15
Original line number Diff line number Diff line
@@ -1620,21 +1620,13 @@ class InputParameterModule(WorkflowModule):
        ]

    def execute(
        self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False
        self,
        trans,
        progress: "WorkflowProgress",
        invocation_step: "WorkflowInvocationStep",
        use_cached_job: bool = False,
    ) -> Optional[bool]:
        step = invocation_step.workflow_step
        if step.id in progress.inputs_by_step_id:
            input_value = progress.inputs_by_step_id[step.id]
        else:
            input_value = step.state.inputs["input"]
        if input_value is NO_REPLACEMENT:
            default_value = step.get_input_default_value(NO_REPLACEMENT)
            # TODO: look at parameter type and infer if value should be a dictionary
            # instead. Guessing only field parameter types in CWL branch would have
            # default as dictionary like this.
            if not isinstance(default_value, dict):
                default_value = {"value": default_value}
            input_value = default_value.get("value", NO_REPLACEMENT)
        input_value = self.get_input_value(progress, invocation_step)
        input_param = self.get_runtime_inputs(self)["input"]
        # TODO: raise DelayedWorkflowEvaluation if replacement not ready ? Need test
        try:
@@ -1654,13 +1646,37 @@ class InputParameterModule(WorkflowModule):
        except ValueError as e:
            raise FailWorkflowEvaluation(
                why=InvocationFailureWorkflowParameterInvalid(
                    reason=FailureReason.workflow_parameter_invalid, workflow_step_id=step.id, details=str(e)
                    reason=FailureReason.workflow_parameter_invalid,
                    workflow_step_id=invocation_step.workflow_step_id,
                    details=str(e),
                )
            )
        step_outputs = dict(output=input_value)
        progress.set_outputs_for_input(invocation_step, step_outputs)
        return None

    def get_input_value(self, progress: "WorkflowProgress", invocation_step: "WorkflowInvocationStep"):
        step = invocation_step.workflow_step
        if step.id in progress.inputs_by_step_id:
            input_value = progress.inputs_by_step_id[step.id]
        else:
            assert step.state
            input_value = step.state.inputs["input"]
        if input_value is NO_REPLACEMENT:
            default_value = step.get_input_default_value(NO_REPLACEMENT)
            # TODO: look at parameter type and infer if value should be a dictionary
            # instead. Guessing only field parameter types in CWL branch would have
            # default as dictionary like this.
            if not isinstance(default_value, dict):
                default_value = {"value": default_value}
            input_value = default_value.get("value", NO_REPLACEMENT)
        return input_value

    def recover_mapping(self, invocation_step: "WorkflowInvocationStep", progress: "WorkflowProgress"):
        input_value = self.get_input_value(progress, invocation_step)
        step_outputs = dict(output=input_value)
        progress.set_outputs_for_input(invocation_step, step_outputs, already_persisted=True)

    def step_state_to_tool_state(self, state):
        state = safe_loads(state)
        default_set, default_value = False, None
+0 −8
Original line number Diff line number Diff line
@@ -611,17 +611,9 @@ class WorkflowProgress:
            outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value
        self.outputs[step.id] = outputs
        if not already_persisted:
            workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs}
            for output_name, output_object in outputs.items():
                if hasattr(output_object, "history_content_type"):
                    invocation_step.add_output(output_name, output_object)
                else:
                    # Add this non-data, non workflow-output output to the workflow outputs.
                    # This is required for recovering the output in the next scheduling iteration,
                    # and should be replaced with a WorkflowInvocationStepOutputValue ASAP.
                    if not workflow_outputs_by_name.get(output_name) and output_object is not NO_REPLACEMENT:
                        workflow_output = model.WorkflowOutput(step, output_name=output_name)
                        step.workflow_outputs.append(workflow_output)
            for workflow_output in step.workflow_outputs:
                assert workflow_output.output_name
                output_name = workflow_output.output_name