Unverified Commit 7bcdd373 authored by John Chilton's avatar John Chilton Committed by GitHub
Browse files

Merge pull request #20980 from mvdbeek/dont_create_anonymous_workflow_outputs

[25.0] Don't create workflow outputs to recover input parameter outputs
parents 8b15bc35 656d5a10
Loading
Loading
Loading
Loading
+31 −15
Original line number Diff line number Diff line
@@ -1620,21 +1620,13 @@ class InputParameterModule(WorkflowModule):
        ]

    def execute(
        self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False
        self,
        trans,
        progress: "WorkflowProgress",
        invocation_step: "WorkflowInvocationStep",
        use_cached_job: bool = False,
    ) -> Optional[bool]:
        step = invocation_step.workflow_step
        if step.id in progress.inputs_by_step_id:
            input_value = progress.inputs_by_step_id[step.id]
        else:
            input_value = step.state.inputs["input"]
        if input_value is NO_REPLACEMENT:
            default_value = step.get_input_default_value(NO_REPLACEMENT)
            # TODO: look at parameter type and infer if value should be a dictionary
            # instead. Guessing only field parameter types in CWL branch would have
            # default as dictionary like this.
            if not isinstance(default_value, dict):
                default_value = {"value": default_value}
            input_value = default_value.get("value", NO_REPLACEMENT)
        input_value = self.get_input_value(progress, invocation_step)
        input_param = self.get_runtime_inputs(self)["input"]
        # TODO: raise DelayedWorkflowEvaluation if replacement not ready ? Need test
        try:
@@ -1654,13 +1646,37 @@ class InputParameterModule(WorkflowModule):
        except ValueError as e:
            raise FailWorkflowEvaluation(
                why=InvocationFailureWorkflowParameterInvalid(
                    reason=FailureReason.workflow_parameter_invalid, workflow_step_id=step.id, details=str(e)
                    reason=FailureReason.workflow_parameter_invalid,
                    workflow_step_id=invocation_step.workflow_step_id,
                    details=str(e),
                )
            )
        step_outputs = dict(output=input_value)
        progress.set_outputs_for_input(invocation_step, step_outputs)
        return None

    def get_input_value(self, progress: "WorkflowProgress", invocation_step: "WorkflowInvocationStep"):
        step = invocation_step.workflow_step
        if step.id in progress.inputs_by_step_id:
            input_value = progress.inputs_by_step_id[step.id]
        else:
            assert step.state
            input_value = step.state.inputs["input"]
        if input_value is NO_REPLACEMENT:
            default_value = step.get_input_default_value(NO_REPLACEMENT)
            # TODO: look at parameter type and infer if value should be a dictionary
            # instead. Guessing only field parameter types in CWL branch would have
            # default as dictionary like this.
            if not isinstance(default_value, dict):
                default_value = {"value": default_value}
            input_value = default_value.get("value", NO_REPLACEMENT)
        return input_value

    def recover_mapping(self, invocation_step: "WorkflowInvocationStep", progress: "WorkflowProgress"):
        input_value = self.get_input_value(progress, invocation_step)
        step_outputs = dict(output=input_value)
        progress.set_outputs_for_input(invocation_step, step_outputs, already_persisted=True)

    def step_state_to_tool_state(self, state):
        state = safe_loads(state)
        default_set, default_value = False, None
+0 −8
Original line number Diff line number Diff line
@@ -611,17 +611,9 @@ class WorkflowProgress:
            outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value
        self.outputs[step.id] = outputs
        if not already_persisted:
            workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs}
            for output_name, output_object in outputs.items():
                if hasattr(output_object, "history_content_type"):
                    invocation_step.add_output(output_name, output_object)
                else:
                    # Add this non-data, non workflow-output output to the workflow outputs.
                    # This is required for recovering the output in the next scheduling iteration,
                    # and should be replaced with a WorkflowInvocationStepOutputValue ASAP.
                    if not workflow_outputs_by_name.get(output_name) and output_object is not NO_REPLACEMENT:
                        workflow_output = model.WorkflowOutput(step, output_name=output_name)
                        step.workflow_outputs.append(workflow_output)
            for workflow_output in step.workflow_outputs:
                assert workflow_output.output_name
                output_name = workflow_output.output_name