Unverified Commit 1e2737a4 authored by Marius van den Beek's avatar Marius van den Beek Committed by GitHub
Browse files

Merge pull request #20650 from jmchilton/deferred_multiple_25_0

[25.0] Fix deferred datasets in multiple dataset parameters.
parents 21d827f6 50e652c2
Loading
Loading
Loading
Loading
+12 −2
Original line number Diff line number Diff line
@@ -7,7 +7,10 @@ from abc import (
    ABCMeta,
    abstractmethod,
)
from typing import Union
from typing import (
    List,
    Union,
)

from galaxy.model import (
    DatasetCollectionElement,
@@ -15,7 +18,14 @@ from galaxy.model import (
    HistoryDatasetCollectionAssociation,
)

DeferrableObjectsT = Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]
DeferrableObjectsT = Union[
    DatasetInstance,
    HistoryDatasetCollectionAssociation,
    DatasetCollectionElement,
    List[DatasetInstance],
    List[Union[HistoryDatasetCollectionAssociation, DatasetCollectionElement]],
    List[Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]],
]


def dataset_path_rewrites(dataset_paths):
+9 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ import logging
import os
import shutil
from typing import (
    Dict,
    NamedTuple,
    Optional,
    Union,
@@ -88,6 +89,7 @@ class DatasetInstanceMaterializer:
        self._object_store_populator = object_store_populator
        self._file_sources = file_sources
        self._sa_session = sa_session
        self._previously_materialized: Dict[int, HistoryDatasetAssociation] = {}

    def ensure_materialized(
        self,
@@ -105,6 +107,12 @@ class DatasetInstanceMaterializer:
        if dataset.state != Dataset.states.DEFERRED and isinstance(dataset_instance, HistoryDatasetAssociation):
            return dataset_instance

        if dataset_instance.id in self._previously_materialized and isinstance(
            dataset_instance, HistoryDatasetAssociation
        ):
            # If we have already materialized this dataset, return the previously materialized instance.
            return self._previously_materialized[dataset_instance.id]

        materialized_dataset_hashes = [h.copy() for h in dataset.hashes]
        if in_place:
            materialized_dataset = dataset_instance.dataset
@@ -195,6 +203,7 @@ class DatasetInstanceMaterializer:
                metadata_tmp_files_dir = None
            materialized_dataset_instance.set_meta(metadata_tmp_files_dir=metadata_tmp_files_dir)
            materialized_dataset_instance.metadata_deferred = False
        self._previously_materialized[dataset_instance.id] = materialized_dataset_instance
        return materialized_dataset_instance

    def _stream_source(self, target_source: DatasetSource, datatype, dataset: Dataset) -> str:
+55 −4
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ from typing import (
    List,
    Optional,
    TYPE_CHECKING,
    Union,
)

from packaging.version import Version
@@ -287,6 +288,30 @@ class ToolEvaluator:
                assert isinstance(value, (model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation))
                undeferred = dataset_materializer.ensure_materialized(value)
                undeferred_objects[key] = undeferred
            elif isinstance(value, list):
                undeferred_list: List[
                    Union[
                        model.DatasetInstance, model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement
                    ]
                ] = []
                for potentially_deferred in value:
                    if isinstance(potentially_deferred, model.DatasetInstance):
                        if potentially_deferred.state != model.Dataset.states.DEFERRED:
                            undeferred_list.append(potentially_deferred)
                        else:
                            assert isinstance(
                                potentially_deferred,
                                (model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation),
                            )
                            undeferred = dataset_materializer.ensure_materialized(potentially_deferred)
                            undeferred_list.append(undeferred)
                    elif isinstance(
                        potentially_deferred,
                        (model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement),
                    ):
                        undeferred_collection = materialize_collection_input(potentially_deferred, dataset_materializer)
                        undeferred_list.append(undeferred_collection)
                undeferred_objects[key] = undeferred_list
            else:
                undeferred_collection = materialize_collection_input(value, dataset_materializer)
                undeferred_objects[key] = undeferred_collection
@@ -348,10 +373,6 @@ class ToolEvaluator:
        Walk input datasets and collections and find inputs that need to be materialized.
        """
        deferred_objects: Dict[str, DeferrableObjectsT] = {}
        for key, value in input_datasets.items():
            if value is not None and value.state == model.Dataset.states.DEFERRED:
                if self._should_materialize_deferred_input(key, value):
                    deferred_objects[key] = value

        def find_deferred_collections(input, value, context, prefixed_name=None, **kwargs):
            if (
@@ -360,8 +381,38 @@ class ToolEvaluator:
            ):
                deferred_objects[prefixed_name] = value

        def find_deferred_datasets(input, value, context, prefixed_name=None, **kwargs):
            if isinstance(input, DataToolParameter):
                if isinstance(value, model.DatasetInstance) and value.state == model.Dataset.states.DEFERRED:
                    deferred_objects[prefixed_name] = value
                elif isinstance(value, list):
                    # handle single list reduction as a collection input
                    if (
                        value
                        and len(value) == 1
                        and isinstance(
                            value[0], (model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement)
                        )
                    ):
                        deferred_objects[prefixed_name] = value
                        return

                    for v in value:
                        if self._should_materialize_deferred_input(prefixed_name, v):
                            deferred_objects[prefixed_name] = value
                            break

        visit_input_values(self.tool.inputs, incoming, find_deferred_datasets)
        visit_input_values(self.tool.inputs, incoming, find_deferred_collections)

        # now place the the inputX datasets hacked in for multiple inputs into the deferred
        # object array also. This is so messy. I think in this case - we only need these for
        # Pulsar staging up which uses the hackier input_datasets flat dict.
        for key, value in input_datasets.items():
            if key not in deferred_objects and value is not None and value.state == model.Dataset.states.DEFERRED:
                if self._should_materialize_deferred_input(key, value):
                    deferred_objects[key] = value

        return deferred_objects

    def _should_materialize_deferred_input(self, input_name: str, input_value: DeferrableObjectsT) -> bool:
+32 −0
Original line number Diff line number Diff line
@@ -613,3 +613,35 @@ def test_null_to_text_tool_with_validation(required_tool: RequiredTool, tool_inp
    required_tool.execute.with_inputs(tool_input_format.when.any({})).assert_fails()
    required_tool.execute.with_inputs(tool_input_format.when.any({"parameter": None})).assert_fails()
    required_tool.execute.with_inputs(tool_input_format.when.any({"parameter": ""})).assert_fails()


@requires_tool_id("cat|cat1")
def test_deferred_basic(required_tool: RequiredTool, target_history: TargetHistory):
    has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
    inputs = {
        "input1": has_src_dict.src_dict,
    }
    output = required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output
    output.assert_contains("chr1	147962192	147962580	CCDS989.1_cds_0_0_chr1_147962193_r	0	-")


@requires_tool_id("metadata_bam")
def test_deferred_with_metadata_options_filter(required_tool: RequiredTool, target_history: TargetHistory):
    has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bam", ext="bam")
    inputs = {
        "input_bam": has_src_dict.src_dict,
        "ref_names": "chrM",
    }
    required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output.with_contents_stripped("chrM")


@requires_tool_id("cat_list")
def test_deferred_multi_input(required_tool: RequiredTool, target_history: TargetHistory):
    has_src_dict_bed = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
    has_src_dict_txt = target_history.with_deferred_dataset_for_test_file("1.txt", ext="txt")
    inputs = {
        "input1": [has_src_dict_bed.src_dict, has_src_dict_txt.src_dict],
    }
    output = required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output
    output.assert_contains("chr1	147962192	147962580	CCDS989.1_cds_0_0_chr1_147962193_r	0	-")
    output.assert_contains("chr1    4225    19670")
+0 −32
Original line number Diff line number Diff line
@@ -2862,38 +2862,6 @@ class TestToolsApi(ApiTestCase, TestsTools):
        output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
        assert output_content.strip() == "123\n456\n456\n0ab"

    @skip_without_tool("cat1")
    def test_run_deferred_dataset(self, history_id):
        details = self.dataset_populator.create_deferred_hda(
            history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
        )
        inputs = {
            "input1": dataset_to_param(details),
        }
        outputs = self._cat1_outputs(history_id, inputs=inputs)
        output = outputs[0]
        details = self.dataset_populator.get_history_dataset_details(
            history_id, dataset=output, wait=True, assert_ok=True
        )
        assert details["state"] == "ok"
        output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
        assert output_content.startswith("chr1	147962192	147962580	CCDS989.1_cds_0_0_chr1_147962193_r	0	-")

    @skip_without_tool("metadata_bam")
    def test_run_deferred_dataset_with_metadata_options_filter(self, history_id):
        details = self.dataset_populator.create_deferred_hda(
            history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam"
        )
        inputs = {"input_bam": dataset_to_param(details), "ref_names": "chrM"}
        run_response = self.dataset_populator.run_tool(tool_id="metadata_bam", inputs=inputs, history_id=history_id)
        output = run_response["outputs"][0]
        output_details = self.dataset_populator.get_history_dataset_details(
            history_id, dataset=output, wait=True, assert_ok=True
        )
        assert output_details["state"] == "ok"
        output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
        assert output_content.startswith("chrM")

    @skip_without_tool("pileup")
    def test_metadata_validator_on_deferred_input(self, history_id):
        deferred_bam_details = self.dataset_populator.create_deferred_hda(
Loading