Commit a53e3cab authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Streamline multiple dataset ingressing

parent 1ad6765e
Loading
Loading
Loading
Loading
Loading
+2 −15
Original line number Diff line number Diff line
@@ -5,12 +5,12 @@ as well as output data from Galaxy tools.
"""

from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union

from bioblend.galaxy.dataset_collections import DatasetCollectionClient
from bioblend.galaxy.datasets import DatasetClient
from bioblend.galaxy.tools import inputs

if TYPE_CHECKING:
    from .data_store import Datastore
@@ -18,14 +18,6 @@ if TYPE_CHECKING:
LOAD_NEUTRON_DATA_TOOL = "neutrons_load_data"


class DataState(Enum):
    """The state of a dataset in Galaxy."""

    LOCAL = "local"
    REMOTE = "remote"
    IN_GALAXY = "in_galaxy"


class DatasetRegistrationError(Exception):
    """
    Exception raised when dataset registration fails.
@@ -95,10 +87,6 @@ class Dataset(AbstractData):
        self.store: Optional["Datastore"] = None
        self.file_type: str = Path(path).suffix
        self.remote_file = remote_file
        if remote_file:
            self.state = DataState.REMOTE
        else:
            self.state = DataState.LOCAL
        self.force_upload = force_upload
        self._content: Any = None

@@ -118,7 +106,7 @@ class Dataset(AbstractData):
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=store.name)[0]["id"]
        if self.remote_file:
            tool_inputs = store.nova_connection.galaxy_instance.tools.inputs.inputs()  # type: ignore
            tool_inputs = inputs.inputs()  # type: ignore
            tool_inputs.set_param("filepath", self.path)
            results = store.nova_connection.galaxy_instance.tools.run_tool(
                history_id=store.history_id, tool_id=LOAD_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
@@ -139,7 +127,6 @@ class Dataset(AbstractData):
            self.id = dataset_info["outputs"][0]["id"]
            self.store = store
        dataset_client.wait_for_dataset(self.id)
        self.state = DataState.IN_GALAXY

    def download(self, local_path: str) -> AbstractData:
        """Downloads this dataset to the local path given."""
+22 −11
Original line number Diff line number Diff line
@@ -11,10 +11,12 @@ if TYPE_CHECKING:
    from .data_store import Datastore
from nova.common.job import WorkState

from .dataset import LOAD_NEUTRON_DATA_TOOL, Dataset, DatasetCollection, DataState
from .dataset import Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters

REGISTER_NEUTRON_DATA_TOOL = "neutrons_register"


class JobStatus:
    """Internal structure to hold job status info."""
@@ -144,6 +146,7 @@ class Job:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        dataset_ids: Dict[str, str] = {}
        datasets_to_ingress = {}
        for name, dataset in datasets.items():
            if self.status.state in [WorkState.STOPPING, WorkState.CANCELING]:
                self.cleanup_datasets(dataset_ids)
@@ -153,16 +156,15 @@ class Job:
                self._link_existing_dataset(dataset)
                if dataset.id:
                    dataset_ids[name] = dataset.id
                    dataset.state = DataState.IN_GALAXY
                    continue

            if dataset.remote_file:
                self._ingest_dataset(dataset)
                datasets_to_ingress[dataset.path] = dataset
            else:
                self._upload_single_dataset(dataset)
            if dataset.id:
                dataset_ids[name] = dataset.id
            dataset.state = DataState.IN_GALAXY

        self._ingest_datasets(datasets_to_ingress)
        for dataset_output in dataset_ids.values():
            if self.status.state in [WorkState.STOPPING, WorkState.CANCELING]:
                self.cleanup_datasets(dataset_ids)
@@ -178,13 +180,22 @@ class Job:
            dataset.id = existing_data[0]["id"]
            dataset.store = self.store

    def _ingest_dataset(self, dataset: Dataset) -> None:
    def _ingest_datasets(self, datasets: dict[str, Dataset]) -> None:
        dataset_client = DatasetClient(self.store.nova_connection.galaxy_instance)
        tool_inputs = galaxy.tools.inputs.inputs()
        tool_inputs.set_param("filepath", dataset.path)
        i = 0
        for d in datasets:
            tool_inputs.set_param(f"series_{i}|input", d)
            i += 1
        results = self.galaxy_instance.tools.run_tool(
            history_id=self.store.history_id, tool_id=LOAD_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
            history_id=self.store.history_id, tool_id=REGISTER_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
        )
        dataset.id = results["outputs"][0]["id"]
        for output in results["outputs"]:
            dataset_client.wait_for_dataset(dataset_id=output["id"])
            # If two datasets have the same path, then shouldn't matter
            dataset = datasets.get(output["name"], None)
            if dataset:
                dataset.id = output["id"]
                dataset.store = self.store

    def _upload_single_dataset(self, dataset: Dataset) -> None:
+2 −3
Original line number Diff line number Diff line
"""Tests for datasets."""

import pytest
from bioblend.galaxy import GalaxyInstance
from bioblend.galaxy.datasets import DatasetClient

from nova.galaxy.connection import Connection
from nova.galaxy.dataset import Dataset

REMOTE_FILE_PATH = ""
# If test fails, this file may be moved or no longer exists.
REMOTE_FILE_PATH = "/HFIR/CG3/shared/Cycle509/IntermediateConfigNiQ_RC509.txt"


def test_dataset_upload(nova_instance: Connection) -> None:
@@ -30,7 +30,6 @@ def test_dataset_set_content_upload(nova_instance: Connection) -> None:
        assert input.get_content() is not None


@pytest.mark.skip
def test_remote_file_ingest(nova_instance: Connection, galaxy_instance: GalaxyInstance) -> None:
    with nova_instance.connect() as connection:
        store = connection.get_data_store(name="nova_galaxy_testing")
+19 −0
Original line number Diff line number Diff line
@@ -13,6 +13,9 @@ from nova.galaxy.tool import Tool

TEST_TOOL_ID = "neutrons_remote_command"
TEST_INT_TOOL_ID = "interactive_tool_generic_output"
# If test fails, these files may be moved or no longer exists.
REMOTE_FILE_PATH = "/HFIR/CG3/shared/Cycle509/IntermediateConfigNiQ_RC509.txt"
REMOTE_FILE_PATH_2 = "/HFIR/CG3/shared/Cycle509/Long6AConfigURBj_RC509.txt"


def test_run_tool(nova_instance: Connection) -> None:
@@ -189,3 +192,19 @@ def test_existing_dataset_as_parameter(nova_instance: Connection, galaxy_instanc
        history_content = galaxy_instance.histories.show_history(history_id=store.history_id, contents=True)
        # should only be 2 elements here (tool and the dataset passed as param), since dataset was manually uploaded
        assert len(history_content) == 2


def test_remote_dataset_as_parameter(nova_instance: Connection, galaxy_instance: GalaxyInstance) -> None:
    with nova_instance.connect() as connection:
        store = connection.get_data_store(name="nova_galaxy_testing")
        store.mark_for_cleanup()
        test_tool = Tool(TEST_TOOL_ID)
        test_data = Dataset(path=REMOTE_FILE_PATH, remote_file=True)
        test_data_2 = Dataset(path=REMOTE_FILE_PATH_2, remote_file=True)
        params = Parameters()
        params.add_input("test", test_data)
        params.add_input("test2", test_data_2)
        test_tool.run(data_store=store, params=params)
        history_content = galaxy_instance.histories.show_history(history_id=store.history_id, contents=True)
        for item in history_content:
            assert item["state"] == "ok"