Commit 78789cba authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Add support for remote files in datasets. Also added option to link datasets...

Add support for remote files in datasets. Also added option to link datasets to existing files upstream
parent 1f532dbb
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.10.1"
version = "0.10.2"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>", "Sergey Yakubov <yakubovs@ornl.gov>"]
readme = "README.md"
+34 −14
Original line number Diff line number Diff line
@@ -15,13 +15,15 @@ from bioblend.galaxy.datasets import DatasetClient
if TYPE_CHECKING:
    from .data_store import Datastore

LOAD_NEUTRON_DATA_TOOL = "neutrons_load_data"


class DataState(Enum):
    """The state of a dataset in Galaxy."""

    NONE = 1
    IN_GALAXY = 2
    UPLOADING = 3
    LOCAL = "local"
    REMOTE = "remote"
    IN_GALAXY = "in_galaxy"


class DatasetRegistrationError(Exception):
@@ -72,12 +74,20 @@ class Dataset(AbstractData):
    If needing to change the path of the Dataset, it is recommended to create a new Dataset instead.
    """

    def __init__(self, path: str = "", name: Optional[str] = None):
    def __init__(
        self, path: str = "", name: Optional[str] = None, remote_file: bool = False, force_upload: bool = True
    ):
        self.path = path
        self.name = name or Path(path).name
        self.id: str = ""
        self.store: Optional["Datastore"] = None
        self.file_type: str = Path(path).suffix
        self.remote_file = remote_file
        if remote_file:
            self.state = DataState.REMOTE
        else:
            self.state = DataState.LOCAL
        self.force_upload = force_upload
        self._content: Any = None

    def upload(self, store: "Datastore", name: Optional[str] = None) -> None:
@@ -95,19 +105,29 @@ class Dataset(AbstractData):
        galaxy_instance = store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=store.name)[0]["id"]
        if name:
            file_name = name
        if self.remote_file:
            tool_inputs = store.nova_connection.galaxy_instance.tools.inputs.inputs()  # type: ignore
            tool_inputs.set_param("filepath", self.path)
            results = store.nova_connection.galaxy_instance.tools.run_tool(
                history_id=store.history_id, tool_id=LOAD_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
            )
            self.id = results["outputs"][0]["id"]
            self.store = self.store

        else:
            file_name = self.name
            file_name = name if name else self.name
            if self._content:
                dataset_info = galaxy_instance.tools.paste_content(
                    content=self._content, history_id=history_id, file_name=file_name
                )
            else:
            dataset_info = galaxy_instance.tools.upload_file(path=self.path, history_id=history_id, file_name=file_name)
                dataset_info = galaxy_instance.tools.upload_file(
                    path=self.path, history_id=history_id, file_name=file_name
                )
            self.id = dataset_info["outputs"][0]["id"]
            self.store = store
        dataset_client.wait_for_dataset(self.id)
        self.state = DataState.IN_GALAXY

    def download(self, local_path: str) -> AbstractData:
        """Downloads this dataset to the local path given."""
+43 −10
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ if TYPE_CHECKING:
    from .data_store import Datastore
from nova.common.job import WorkState

from .dataset import Dataset, DatasetCollection
from .dataset import LOAD_NEUTRON_DATA_TOOL, Dataset, DatasetCollection, DataState
from .outputs import Outputs
from .parameters import Parameters

@@ -143,21 +143,26 @@ class Job:
        """Helper method to upload multiple datasets or collections in parallel."""
        galaxy_instance = self.store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=self.store.name)[0]["id"]
        dataset_ids: Dict[str, str] = {}
        for name, dataset in datasets.items():
            if self.status.state in [WorkState.STOPPING, WorkState.CANCELING]:
                self.cleanup_datasets(dataset_ids)
                return None
            if len(dataset.path) < 1 and dataset.get_content():
                dataset_info = galaxy_instance.tools.paste_content(
                    content=str(dataset.get_content()), history_id=history_id, file_name=dataset.name
                )

            if not dataset.force_upload:
                self._link_existing_dataset(dataset)
                if dataset.id:
                    dataset_ids[name] = dataset.id
                    dataset.state = DataState.IN_GALAXY
                    continue

            if dataset.remote_file:
                self._ingest_dataset(dataset)
            else:
                dataset_info = galaxy_instance.tools.upload_file(path=dataset.path, history_id=history_id)
            dataset_ids[name] = dataset_info["outputs"][0]["id"]
            dataset.id = dataset_info["outputs"][0]["id"]
            dataset.store = self.store
                self._upload_single_dataset(dataset)
            dataset_ids[name] = dataset.id
            dataset.state = DataState.IN_GALAXY

        for dataset_output in dataset_ids.values():
            if self.status.state in [WorkState.STOPPING, WorkState.CANCELING]:
                self.cleanup_datasets(dataset_ids)
@@ -165,6 +170,34 @@ class Job:
            dataset_client.wait_for_dataset(dataset_output)
        return dataset_ids

    def _link_existing_dataset(self, dataset: Dataset) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        existing_data = dataset_client.get_datasets(history_id=self.store.history_id, name=dataset.name)
        if len(existing_data) > 0:
            dataset.id = existing_data[0]["id"]
            dataset.store = self.store

    def _ingest_dataset(self, dataset: Dataset) -> None:
        tool_inputs = galaxy.tools.inputs.inputs()
        tool_inputs.set_param("filepath", dataset.path)
        results = self.galaxy_instance.tools.run_tool(
            history_id=self.store.history_id, tool_id=LOAD_NEUTRON_DATA_TOOL, tool_inputs=tool_inputs
        )
        dataset.id = results["outputs"][0]["id"]
        dataset.store = self.store

    def _upload_single_dataset(self, dataset: Dataset) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        if len(dataset.path) < 1 and dataset.get_content():
            dataset_info = galaxy_instance.tools.paste_content(
                content=str(dataset.get_content()), history_id=self.store.history_id, file_name=dataset.name
            )
        else:
            dataset_info = galaxy_instance.tools.upload_file(path=dataset.path, history_id=self.store.history_id)
        dataset.id = dataset_info["outputs"][0]["id"]
        dataset.store = self.store

    def cleanup_datasets(self, datasets: Dict[str, str]) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        history_id = galaxy_instance.histories.get_histories(name=self.store.name)[0]["id"]