Unverified Commit 808d5f94 authored by John Chilton's avatar John Chilton Committed by GitHub
Browse files

Merge pull request #19807 from mvdbeek/use_materialized_datasets_in_pulsar

[24.2] Use materialized datasets in pulsar job runner
parents ca61f83e 31aeefd6
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ from typing import (
    Dict,
)

from galaxy.job_execution.datasets import DeferrableObjectsT
from galaxy.job_execution.setup import JobIO
from galaxy.model import Job

@@ -22,6 +23,9 @@ class ComputeEnvironment(metaclass=ABCMeta):
    compute server.
    """

    def __init__(self):
        self.materialized_objects: Dict[str, DeferrableObjectsT] = {}

    @abstractmethod
    def output_names(self):
        """Output unqualified filenames defined by job."""
+9 −0
Original line number Diff line number Diff line
@@ -7,6 +7,15 @@ from abc import (
    ABCMeta,
    abstractmethod,
)
from typing import Union

from galaxy.model import (
    DatasetCollectionElement,
    DatasetInstance,
    HistoryDatasetCollectionAssociation,
)

DeferrableObjectsT = Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]


def dataset_path_rewrites(dataset_paths):
+16 −7
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ from galaxy.files import (
from galaxy.job_execution.datasets import (
    DatasetPath,
    DatasetPathRewriter,
    DeferrableObjectsT,
    get_path_rewriter,
)
from galaxy.model import (
@@ -148,7 +149,7 @@ class JobIO(UsesDictVisibleKeys):
        self._dataset_path_rewriter: Optional[DatasetPathRewriter] = None

    @property
    def job(self):
    def job(self) -> Job:
        return self.sa_session.get(Job, self.job_id)

    @classmethod
@@ -217,11 +218,19 @@ class JobIO(UsesDictVisibleKeys):
            filenames.append(ds.dataset.extra_files_path)
        return filenames

    def get_input_datasets(self) -> List[DatasetInstance]:
    def get_input_datasets(
        self, materialized_objects: Optional[Dict[str, DeferrableObjectsT]] = None
    ) -> List[DatasetInstance]:
        job = self.job
        return [
            da.dataset for da in job.input_datasets + job.input_library_datasets if da.dataset
        ]  # da is JobToInputDatasetAssociation object
        datasets: List[DatasetInstance] = []
        for da in job.input_datasets + job.input_library_datasets:
            if materialized_objects and da.name in materialized_objects:
                materialized_object = materialized_objects[da.name]
                if isinstance(materialized_object, DatasetInstance):
                    datasets.append(materialized_object)
            elif da.dataset:
                datasets.append(da.dataset)
        return datasets

    def get_input_fnames(self) -> List[str]:
        filenames = []
@@ -229,9 +238,9 @@ class JobIO(UsesDictVisibleKeys):
            filenames.extend(self.get_input_dataset_fnames(ds))
        return filenames

    def get_input_paths(self) -> List[DatasetPath]:
    def get_input_paths(self, materialized_objects: Optional[Dict[str, DeferrableObjectsT]]) -> List[DatasetPath]:
        paths = []
        for ds in self.get_input_datasets():
        for ds in self.get_input_datasets(materialized_objects):
            paths.append(self.get_input_path(ds))
        return paths

+3 −1
Original line number Diff line number Diff line
@@ -365,7 +365,9 @@ class PulsarJobRunner(AsynchronousJobRunner):
                output_names = compute_environment.output_names()

                client_inputs_list = []
                for input_dataset_wrapper in job_wrapper.job_io.get_input_paths():
                for input_dataset_wrapper in job_wrapper.job_io.get_input_paths(
                    compute_environment.materialized_objects
                ):
                    # str here to resolve false_path if set on a DatasetPath object.
                    path = str(input_dataset_wrapper)
                    object_store_ref = {
+2 −6
Original line number Diff line number Diff line
@@ -13,7 +13,6 @@ from typing import (
    List,
    Optional,
    TYPE_CHECKING,
    Union,
)

from packaging.version import Version
@@ -21,6 +20,7 @@ from packaging.version import Version
from galaxy import model
from galaxy.authnz.util import provider_name_to_backend
from galaxy.job_execution.compute_environment import ComputeEnvironment
from galaxy.job_execution.datasets import DeferrableObjectsT
from galaxy.job_execution.setup import ensure_configs_directory
from galaxy.model.deferred import (
    materialize_collection_input,
@@ -117,11 +117,6 @@ def global_tool_logs(func, config_file: str, action_str: str, tool: "Tool"):
        ) from e


DeferrableObjectsT = Union[
    model.DatasetInstance, model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement
]


class ToolEvaluator:
    """An abstraction linking together a tool and a job runtime to evaluate
    tool inputs in an isolated, testable manner.
@@ -168,6 +163,7 @@ class ToolEvaluator:

        # materialize deferred datasets
        materialized_objects = self._materialize_objects(deferred_objects, self.local_working_directory)
        self.compute_environment.materialized_objects = materialized_objects

        # replace materialized objects back into tool input parameters
        self._replaced_deferred_objects(inp_data, incoming, materialized_objects)
Loading