Commit c43a9a68 authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Merge branch '22-tool-management' into 'main'

Add recovery for data stores and propogate ids to tools

Closes #22

See merge request ndip/public-packages/nova-galaxy!10
parents 9f8b7043 2cf660d5
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.5.0"
version = "0.6.0"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>"]
readme = "README.md"
+28 −1
Original line number Diff line number Diff line
"""DataStore is used to configure Galaxy to group outputs of a tool together."""

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
    from .nova import NovaConnection  # Only imports for type checking

from .tool import Tool


class Datastore:
    """Groups tool outputs together.
@@ -19,4 +21,29 @@ class Datastore:
        self.persist_store = False

    def persist(self) -> None:
        """Persist this store even after the nova connection is closed.

        Should be used carefully as tools will continue to run after even if this object is garbage collected.
        Use recover_tools() to with the same data store name to retrieve all running tools again.
        """
        self.persist_store = True

    def recover_tools(self) -> List[Tool]:
        """Recovers all running tools in this data_store.

        Mainly used to recover all the running tools inside of this data store or any past persisted data stores that
        used the same name. Can also be used to simply get a list of all running tools in a store as well.
        """
        history_contents = self.nova_connection.galaxy_instance.histories.show_history(
            self.history_id, contents=True, deleted=False, details="all"
        )
        tools = []
        for dataset in history_contents:
            job_id = dataset["creating_job"]
            tool_id = self.nova_connection.galaxy_instance.jobs.show_job(job_id)["tool_id"]
            t = Tool(tool_id)
            t.assign_id(job_id, self)
            t.get_url()
            t.get_status()
            tools.append(t)
        return tools
+10 −9
Original line number Diff line number Diff line
@@ -7,11 +7,12 @@ as well as output data from Galaxy tools.
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from bioblend.galaxy.dataset_collections import DatasetCollectionClient
from bioblend.galaxy.datasets import DatasetClient

if TYPE_CHECKING:
    from .data_store import Datastore


@@ -46,10 +47,10 @@ class AbstractData(ABC):
        super().__init__()
        self.path: str = ""
        self.id: Union[str, None] = ""
        self.store: Union[None, Datastore] = None
        self.store: Union[None, "Datastore"] = None

    @abstractmethod
    def upload(self, store: Datastore) -> None:
    def upload(self, store: "Datastore") -> None:
        raise NotImplementedError()

    @abstractmethod
@@ -71,9 +72,9 @@ class Dataset(AbstractData):
        self.path = path
        self.name = name or Path(path).name
        self.id: str
        self.store: Datastore
        self.store: "Datastore"

    def upload(self, store: Datastore) -> None:
    def upload(self, store: "Datastore") -> None:
        galaxy_instance = store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=store.name)[0]["id"]
@@ -106,9 +107,9 @@ class DatasetCollection(AbstractData):
        self.path = path
        self.name = name or Path(path).name
        self.id: str
        self.store: Datastore
        self.store: "Datastore"

    def upload(self, store: Datastore) -> None:
    def upload(self, store: "Datastore") -> None:
        """Will need to handle this differently than single datasets."""
        raise NotImplementedError

@@ -133,7 +134,7 @@ class DatasetCollection(AbstractData):
            raise Exception("Dataset collection is not present in Galaxy.")


def upload_datasets(store: Datastore, datasets: Dict[str, AbstractData]) -> Dict[str, str]:
def upload_datasets(store: "Datastore", datasets: Dict[str, AbstractData]) -> Dict[str, str]:
    """Helper method to upload multiple datasets or collections in parallel."""
    galaxy_instance = store.nova_connection.galaxy_instance
    dataset_client = DatasetClient(galaxy_instance)
+17 −15
Original line number Diff line number Diff line
@@ -3,10 +3,11 @@
import sys
import time
from threading import Thread
from typing import Dict, Optional
from typing import TYPE_CHECKING, Dict, Optional

from bioblend import galaxy

if TYPE_CHECKING:
    from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .outputs import Outputs
@@ -26,7 +27,7 @@ class JobStatus:
class Job:
    """Internal class managing Galaxy job execution. Should not be used by end users."""

    def __init__(self, tool_id: str, data_store: Datastore) -> None:
    def __init__(self, tool_id: str, data_store: "Datastore") -> None:
        self.id = ""
        self.datasets = None
        self.collections = None
@@ -36,7 +37,7 @@ class Job:
        self.status = JobStatus()
        self.url: Optional[str] = None

    def _run_and_wait(self, params: Parameters) -> None:
    def _run_and_wait(self, params: Optional[Parameters]) -> None:
        """Runs tools and waits for result."""
        self.submit(params)
        try:
@@ -47,7 +48,7 @@ class Job:

        self.status.state = WorkState.FINISHED

    def run(self, params: Parameters, wait: bool) -> Optional[Outputs]:
    def run(self, params: Optional[Parameters], wait: bool) -> Optional[Outputs]:
        """Runs a job in Galaxy."""
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR]:
            thread = Thread(target=self._run_and_wait, args=(params,))
@@ -60,7 +61,7 @@ class Job:
            raise Exception(f"Tool {self.tool} (id: {self.id}) is already running.")

    def run_interactive(
        self, params: Parameters, wait: bool, max_tries: int = 100, check_url: bool = True
        self, params: Optional[Parameters], wait: bool, max_tries: int = 100, check_url: bool = True
    ) -> Optional[str]:
        """Runs an interactive tool in Galaxy and returns a link to the tool."""
        self.run(params, False)
@@ -80,7 +81,7 @@ class Job:
        else:
            raise Exception("Interactive tool was stopped unexpectedly.")

    def submit(self, params: Parameters) -> None:
    def submit(self, params: Optional[Parameters]) -> None:
        """Handles uploading inputs and submitting job."""
        self.status.state = WorkState.UPLOADING_DATA
        self.url = None
@@ -88,6 +89,7 @@ class Job:

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        if params:
            for param, val in params.inputs.items():
                if isinstance(val, AbstractData):
                    datasets_to_upload[param] = val
+26 −6
Original line number Diff line number Diff line
"""Contains classes to run tools in Galaxy via Nova."""

from typing import List, Optional, Union
from typing import TYPE_CHECKING, List, Optional, Union

if TYPE_CHECKING:
    from .data_store import Datastore  # Only imports for type checking

from .data_store import Datastore
from .dataset import AbstractData
from .job import Job
from .outputs import Outputs
@@ -22,7 +24,7 @@ class AbstractWork:
    def get_inputs(self) -> List[Parameters]:
        return []

    def run(self, data_store: Datastore, params: Parameters, wait: bool) -> Union[Outputs, None]:
    def run(self, data_store: "Datastore", params: Parameters, wait: bool) -> Union[Outputs, None]:
        return None


@@ -38,7 +40,7 @@ class Tool(AbstractWork):
        super().__init__(id)
        self._job: Optional[Job] = None

    def run(self, data_store: Datastore, params: Parameters, wait: bool = True) -> Optional[Outputs]:
    def run(self, data_store: "Datastore", params: Optional[Parameters] = None, wait: bool = True) -> Optional[Outputs]:
        """Run this tool.

        By default, will be run in a blocking manner, unless `wait` is set to False. Will return the
@@ -64,7 +66,12 @@ class Tool(AbstractWork):
        return self._job.run(params, wait)

    def run_interactive(
        self, data_store: Datastore, params: Parameters, wait: bool = True, max_tries: int = 100, check_url: bool = True
        self,
        data_store: "Datastore",
        params: Optional[Parameters] = None,
        wait: bool = True,
        max_tries: int = 100,
        check_url: bool = True,
    ) -> Optional[str]:
        """Run tool interactively.

@@ -140,8 +147,21 @@ class Tool(AbstractWork):
            return self._job.get_url()
        return None

    def get_uid(self) -> Optional[str]:
        """Get the unique ID for this tool. Will only be available if Tool.run() has been successfully invoked."""
        if self._job:
            return self._job.id
        return None

    def assign_id(self, new_id: str, data_store: "Datastore") -> None:
        if self._job:
            raise Exception("Tool cannot be currently assigned an ID. Do not directly call this method.")
        self._job = Job(self.id, data_store)
        self._job.id = new_id
        self._job.status.state = WorkState.QUEUED


def stop_all_tools_in_store(data_store: Datastore) -> None:
def stop_all_tools_in_store(data_store: "Datastore") -> None:
    """Stops all the tools from running in a particular store."""
    galaxy_instance = data_store.nova_connection.galaxy_instance
    jobs = galaxy_instance.jobs.get_jobs(history_id=data_store.history_id)
Loading