Unverified Commit a5960ee2 authored by mvdbeek's avatar mvdbeek
Browse files

Fix map over calculation for runtime inputs

parent ce36118d
Loading
Loading
Loading
Loading
+30 −1
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@ from typing import (
    Any,
    Dict,
    List,
    Sequence,
    Union,
)

from galaxy.tools.parameters.basic import (
@@ -187,6 +189,26 @@ def process_key(incoming_key: str, incoming_value: Any, d: Dict[str, Any]):
        process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)


def nested_key_to_path(key: str) -> Sequence[Union[str, int]]:
    """
    Convert a tool state key that is separated with '|' and '_n' into path iterable.
    E.g. "cond|repeat_0|paramA" -> ["cond", "repeat", 0, "paramA"].
    Return value can be used with `boltons.iterutils.get_path`.
    """
    path: List[Union[str, int]] = []
    key_parts = key.split("|")
    if len(key_parts) == 1:
        return key_parts
    for key_part in key_parts:
        if "_" in key_part:
            input_name, _index = key_part.rsplit("_", 1)
            if _index.isdigit():
                path.extend((input_name, int(_index)))
                continue
        path.append(key_part)
    return path


def flat_to_nested_state(incoming: Dict[str, Any]):
    nested_state: Dict[str, Any] = {}
    for key, value in incoming.items():
@@ -194,4 +216,11 @@ def flat_to_nested_state(incoming: Dict[str, Any]):
    return nested_state


__all__ = ("LegacyUnprefixedDict", "WrappedParameters", "make_dict_copy", "process_key", "flat_to_nested_state")
__all__ = (
    "LegacyUnprefixedDict",
    "WrappedParameters",
    "make_dict_copy",
    "process_key",
    "flat_to_nested_state",
    "nested_key_to_path",
)
+6 −0
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ from typing import (
    Union,
)

from boltons.iterutils import get_path
from typing_extensions import Protocol

from galaxy import model
@@ -35,6 +36,7 @@ from galaxy.schema.invocation import (
    WarningReason,
)
from galaxy.tools.parameters.basic import raw_to_galaxy
from galaxy.tools.parameters.wrapped import nested_key_to_path
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
from galaxy.workflow.run_request import (
@@ -444,6 +446,10 @@ class WorkflowProgress:
                    replacement = temp
            else:
                replacement = self.replacement_for_connection(connection[0], is_data=is_data)
        elif step.state and (state_input := get_path(step.state.inputs, nested_key_to_path(prefixed_name), None)):
            # workflow submitted with step parameters populates state directly
            # via populate_module_and_state
            replacement = state_input
        else:
            for step_input in step.inputs:
                if step_input.name == prefixed_name and step_input.default_value_set:
+19 −1
Original line number Diff line number Diff line
@@ -3,10 +3,28 @@ from typing import (
    Dict,
)

from galaxy.tools.parameters.wrapped import process_key
from galaxy.tools.parameters.wrapped import (
    nested_key_to_path,
    process_key,
)
from .util import BaseParameterTestCase


def test_nested_key_to_path():
    assert nested_key_to_path("param") == ["param"]
    assert nested_key_to_path("param_x") == ["param_x"]
    assert nested_key_to_path("cond|param_x") == ["cond", "param_x"]
    assert nested_key_to_path("param_") == ["param_"]
    assert nested_key_to_path("cond|param_") == ["cond", "param_"]
    assert nested_key_to_path("repeat_1|inner_repeat_1|data_table_column_value") == [
        "repeat",
        1,
        "inner_repeat",
        1,
        "data_table_column_value",
    ]


class TestProcessKey:
    def test_process_key(self):
        nested_dict: Dict[str, Any] = {}