Commit 302c7ddb authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Merge branch '9-remote-dataset' into 'main'

Implement remote file locations for datasets and streamline dataset uploading.

Closes #9

See merge request ndip/public-packages/nova-galaxy!25
parents 1f532dbb 4045381c
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -9,7 +9,7 @@ variables:
  IMAGE_NAME: ${NDIP_DOCKER_REPOSITORY}/${CI_PROJECT_PATH}
  IMAGE_TAG: "0.1.0"
  GALAXY_URL: "https://calvera-test.ornl.gov"
  GALAXY_KEY: ${SERVICE_ACCOUNT_API_KEY}
  GALAXY_KEY: ${CALVERA_TEST_API_KEY}

before_script:
  - curl https://code.ornl.gov/rse-deployment/rse-sharables/raw/master/rse-bash-modules.sh -O
+11 −1
Original line number Diff line number Diff line
## Nova Galaxy 0.9.1
### Nova Galaxy 0.10.2
- Added ability to mark datasets as remote files, and Nova-Galaxy will attempt to ingress them when running tools (thanks to Gregory Cage). [Merge Request 25](https://code.ornl.gov/ndip/public-packages/nova-galaxy/-/merge_requests/25)
- Datasets can now be linked to existing datasets when uploaded as tool parameters using force_upload parameter. This saves users from having to upload a dataset multiple times if not necessary (thanks to Gregory Cage). [Merge Request 25](https://code.ornl.gov/ndip/public-packages/nova-galaxy/-/merge_requests/25)

### Nova Galaxy 0.10.1
- Dependency update (thanks to Sergey Yakubov). [Commit](https://code.ornl.gov/ndip/public-packages/nova-galaxy/-/commit/1f532dbbd5c6603c7e358101c0b3830fb2b36f5a)

### Nova Galaxy 0.10.0
- Added ToolRunner class to facilitate an event driven running of tools (thanks to Sergey Yakubov). [Merge Request 24](https://code.ornl.gov/ndip/public-packages/nova-galaxy/-/merge_requests/24)

### Nova Galaxy 0.9.1
- Added `get_full_status` method to tool in order to get detailed messages mostly for error states (thanks to Gregory Cage). [Merge Request 23](https://code.ornl.gov/ndip/public-packages/nova-galaxy/-/merge_requests/23)

### Nova Galaxy 0.9.0
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.10.1"
version = "0.10.2"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>", "Sergey Yakubov <yakubovs@ornl.gov>"]
readme = "README.md"
+39 −20
Original line number Diff line number Diff line
@@ -5,23 +5,17 @@ as well as output data from Galaxy tools.
"""

from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union

from bioblend.galaxy.dataset_collections import DatasetCollectionClient
from bioblend.galaxy.datasets import DatasetClient
from bioblend.galaxy.tools import inputs

if TYPE_CHECKING:
    from .data_store import Datastore


class DataState(Enum):
    """The state of a dataset in Galaxy."""

    NONE = 1
    IN_GALAXY = 2
    UPLOADING = 3
LOAD_NEUTRON_DATA_TOOL = "neutrons_register"


class DatasetRegistrationError(Exception):
@@ -41,7 +35,7 @@ class DatasetRegistrationError(Exception):


class AbstractData(ABC):
    """Encapsulates data for use in Galaxy toools."""
    """Encapsulates data for use in Galaxy tools."""

    def __init__(self) -> None:
        super().__init__()
@@ -70,14 +64,30 @@ class Dataset(AbstractData):
    """Singular file that can be uploaded and used in a Galaxy tool.

    If needing to change the path of the Dataset, it is recommended to create a new Dataset instead.

    Parameters
    ----------
        path: str
            The path to the file that this dataset is representing. Can be left blank if manually providing content.
        name: Optional[str]
            The name of this dataset. Defaults to the filename from the path if provided.
        remote_file: bool
            Whether this file is a remote file that upstream has access to. Defaults to False (local file).
        force_upload: bool
            Whether to explicitly upload this dataset every time despite another dataset with the same name existing
            upstream. If False, Nova Galaxy will attempt to link this dataset with an upstream copy. Defaults to True.
    """

    def __init__(self, path: str = "", name: Optional[str] = None):
    def __init__(
        self, path: str = "", name: Optional[str] = None, remote_file: bool = False, force_upload: bool = True
    ):
        self.path = path
        self.name = name or Path(path).name
        self.id: str = ""
        self.store: Optional["Datastore"] = None
        self.file_type: str = Path(path).suffix
        self.remote_file = remote_file
        self.force_upload = force_upload
        self._content: Any = None

    def upload(self, store: "Datastore", name: Optional[str] = None) -> None:
@@ -95,16 +105,25 @@ class Dataset(AbstractData):
        galaxy_instance = store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=store.name)[0]["id"]
        if name:
            file_name = name
        if self.remote_file:
            tool_inputs = inputs.inputs()  # type: ignore
            tool_inputs.set_param("series_0|input", self.path)
            results = store.nova_connection.galaxy_instance.tools.run_tool(
                history_id=store.history_id, tool_id=LOAD_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
            )
            self.id = results["outputs"][0]["id"]
            self.store = self.store

        else:
            file_name = self.name
            file_name = name if name else self.name
            if self._content:
                dataset_info = galaxy_instance.tools.paste_content(
                    content=self._content, history_id=history_id, file_name=file_name
                )
            else:
            dataset_info = galaxy_instance.tools.upload_file(path=self.path, history_id=history_id, file_name=file_name)
                dataset_info = galaxy_instance.tools.upload_file(
                    path=self.path, history_id=history_id, file_name=file_name
                )
            self.id = dataset_info["outputs"][0]["id"]
            self.store = store
        dataset_client.wait_for_dataset(self.id)
+53 −9
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@ from .dataset import Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters

REGISTER_NEUTRON_DATA_TOOL = "neutrons_register"


class JobStatus:
    """Internal structure to hold job status info."""
@@ -143,21 +145,26 @@ class Job:
        """Helper method to upload multiple datasets or collections in parallel."""
        galaxy_instance = self.store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=self.store.name)[0]["id"]
        dataset_ids: Dict[str, str] = {}
        datasets_to_ingress = {}
        for name, dataset in datasets.items():
            if self.status.state in [WorkState.STOPPING, WorkState.CANCELING]:
                self.cleanup_datasets(dataset_ids)
                return None
            if len(dataset.path) < 1 and dataset.get_content():
                dataset_info = galaxy_instance.tools.paste_content(
                    content=str(dataset.get_content()), history_id=history_id, file_name=dataset.name
                )

            if not dataset.force_upload:
                self._link_existing_dataset(dataset)
                if dataset.id:
                    dataset_ids[name] = dataset.id
                    continue

            if dataset.remote_file:
                datasets_to_ingress[dataset.path] = dataset
            else:
                dataset_info = galaxy_instance.tools.upload_file(path=dataset.path, history_id=history_id)
            dataset_ids[name] = dataset_info["outputs"][0]["id"]
            dataset.id = dataset_info["outputs"][0]["id"]
            dataset.store = self.store
                self._upload_single_dataset(dataset)
            if dataset.id:
                dataset_ids[name] = dataset.id
        self._ingest_datasets(datasets_to_ingress)
        for dataset_output in dataset_ids.values():
            if self.status.state in [WorkState.STOPPING, WorkState.CANCELING]:
                self.cleanup_datasets(dataset_ids)
@@ -165,6 +172,43 @@ class Job:
            dataset_client.wait_for_dataset(dataset_output)
        return dataset_ids

    def _link_existing_dataset(self, dataset: Dataset) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        existing_data = dataset_client.get_datasets(history_id=self.store.history_id, name=dataset.name)
        if len(existing_data) > 0:
            dataset.id = existing_data[0]["id"]
            dataset.store = self.store

    def _ingest_datasets(self, datasets: dict[str, Dataset]) -> None:
        dataset_client = DatasetClient(self.store.nova_connection.galaxy_instance)
        tool_inputs = galaxy.tools.inputs.inputs()
        i = 0
        for d in datasets:
            tool_inputs.set_param(f"series_{i}|input", d)
            i += 1
        results = self.galaxy_instance.tools.run_tool(
            history_id=self.store.history_id, tool_id=REGISTER_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
        )
        for output in results["outputs"]:
            dataset_client.wait_for_dataset(dataset_id=output["id"])
            # If two datasets have the same path, then shouldn't matter
            dataset = datasets.get(output["name"], None)
            if dataset:
                dataset.id = output["id"]
                dataset.store = self.store

    def _upload_single_dataset(self, dataset: Dataset) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        if len(dataset.path) < 1 and dataset.get_content():
            dataset_info = galaxy_instance.tools.paste_content(
                content=str(dataset.get_content()), history_id=self.store.history_id, file_name=dataset.name
            )
        else:
            dataset_info = galaxy_instance.tools.upload_file(path=dataset.path, history_id=self.store.history_id)
        dataset.id = dataset_info["outputs"][0]["id"]
        dataset.store = self.store

    def cleanup_datasets(self, datasets: Dict[str, str]) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        history_id = galaxy_instance.histories.get_histories(name=self.store.name)[0]["id"]
Loading