Commit e168ff23 authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Rework stopping and canceling jobs

parent 793db9fe
Loading
Loading
Loading
Loading
Loading
+1 −4
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ This example shows how to run a tool that takes multiple datasets as input.

.. code-block:: python

   from nova.galaxy import Nova, Dataset, Tool, Parameters, upload_datasets
   from nova.galaxy import Nova, Dataset, Tool, Parameters

   galaxy_url = "your_galaxy_url"
   galaxy_key = "your_galaxy_api_key"
@@ -20,9 +20,6 @@ This example shows how to run a tool that takes multiple datasets as input.
       dataset1 = Dataset("path/to/file1.txt", name="File 1")
       dataset2 = Dataset("path/to/file2.txt", name="File 2")

       # Upload multiple datasets in parallel
       upload_datasets(data_store, {"input1": dataset1, "input2": dataset2})

       # Define parameters, using the uploaded datasets
       params = Parameters()
       params.add_input("input1", dataset1)
+1 −2
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@ import importlib.metadata

from .connection import Connection
from .data_store import Datastore
from .dataset import Dataset, DatasetCollection, upload_datasets
from .dataset import Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters
from .tool import Tool
@@ -13,7 +13,6 @@ __all__ = [
    "Datastore",
    "Dataset",
    "DatasetCollection",
    "upload_datasets",
    "Outputs",
    "Parameters",
    "Tool",
+2 −22
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ as well as output data from Galaxy tools.
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union

from bioblend.galaxy.dataset_collections import DatasetCollectionClient
from bioblend.galaxy.datasets import DatasetClient
@@ -47,6 +47,7 @@ class AbstractData(ABC):
        super().__init__()
        self.path: str = ""
        self.id: Union[str, None] = ""
        self.name: str = ""
        self.store: Union[None, "Datastore"] = None

    @abstractmethod
@@ -181,24 +182,3 @@ class DatasetCollection(AbstractData):
            return info["elements"]
        else:
            raise Exception("Dataset collection is not present in Galaxy.")


def upload_datasets(store: "Datastore", datasets: Dict[str, AbstractData]) -> Dict[str, str]:
    """Helper method to upload multiple datasets or collections in parallel."""
    galaxy_instance = store.nova_connection.galaxy_instance
    dataset_client = DatasetClient(galaxy_instance)
    history_id = galaxy_instance.histories.get_histories(name=store.name)[0]["id"]
    dataset_ids = {}
    for name, dataset in datasets.items():
        if len(dataset.path) < 1 and dataset.get_content():
            dataset_info = galaxy_instance.tools.paste_content(
                content=str(dataset.get_content()), history_id=history_id
            )
        else:
            dataset_info = galaxy_instance.tools.upload_file(path=dataset.path, history_id=history_id)
        dataset_ids[name] = dataset_info["outputs"][0]["id"]
        dataset.id = dataset_info["outputs"][0]["id"]
        dataset.store = store
    for dataset_output in dataset_ids.values():
        dataset_client.wait_for_dataset(dataset_output)
    return dataset_ids
+56 −14
Original line number Diff line number Diff line
@@ -5,10 +5,11 @@ from threading import Thread
from typing import TYPE_CHECKING, Dict, Optional

from bioblend import galaxy
from bioblend.galaxy.datasets import DatasetClient

if TYPE_CHECKING:
    from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .dataset import Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters
from .util import WorkState
@@ -44,6 +45,9 @@ class Job:
            self.wait_for_results()
        except Exception as e:
            self.url = None
            if self.status.state == WorkState.STOPPING or self.status.state == WorkState.CANCELED:
                self.status.state = WorkState.CANCELED
                return
            self.status.state = WorkState.ERROR
            self.status.error_msg = str(e)
            return
@@ -93,14 +97,18 @@ class Job:
        tool_inputs = galaxy.tools.inputs.inputs()
        if params:
            for param, val in params.inputs.items():
                if isinstance(val, AbstractData):
                if isinstance(val, Dataset):
                    datasets_to_upload[param] = val
                else:
                    tool_inputs.set_param(param, val)
            ids = upload_datasets(store=self.store, datasets=datasets_to_upload)
            ids = self.upload_datasets(datasets=datasets_to_upload)
            if ids:
                for param, val in ids.items():
                    tool_inputs.set_dataset_param(param, val)

        if self.status.state == WorkState.STOPPING:
            self.status.state = WorkState.CANCELED
            return
        # Run tool and wait for job to finish
        self.status.state = WorkState.QUEUED
        results = self.galaxy_instance.tools.run_tool(
@@ -110,22 +118,56 @@ class Job:
        self.datasets = results["outputs"]
        self.collections = results["output_collections"]

    def upload_datasets(self, datasets: Dict[str, Dataset]) -> Optional[Dict[str, str]]:
        """Helper method to upload multiple datasets or collections in parallel."""
        galaxy_instance = self.store.nova_connection.galaxy_instance
        dataset_client = DatasetClient(galaxy_instance)
        history_id = galaxy_instance.histories.get_histories(name=self.store.name)[0]["id"]
        dataset_ids: Dict[str, str] = {}
        for name, dataset in datasets.items():
            if self.status.state == WorkState.STOPPING:
                self.cleanup_datasets(dataset_ids)
                return None
            if len(dataset.path) < 1 and dataset.get_content():
                dataset_info = galaxy_instance.tools.paste_content(
                    content=str(dataset.get_content()), history_id=history_id, file_name=dataset.name
                )
            else:
                dataset_info = galaxy_instance.tools.upload_file(path=dataset.path, history_id=history_id)
            dataset_ids[name] = dataset_info["outputs"][0]["id"]
            dataset.id = dataset_info["outputs"][0]["id"]
            dataset.store = self.store
        for dataset_output in dataset_ids.values():
            if self.status.state == WorkState.STOPPING:
                self.cleanup_datasets(dataset_ids)
                return None
            dataset_client.wait_for_dataset(dataset_output)
        return dataset_ids

    def cleanup_datasets(self, datasets: Dict[str, str]) -> None:
        galaxy_instance = self.store.nova_connection.galaxy_instance
        history_id = galaxy_instance.histories.get_histories(name=self.store.name)[0]["id"]
        for dataset_id in datasets.values():
            galaxy_instance.histories.delete_dataset(history_id=history_id, dataset_id=dataset_id, purge=True)

    def cancel(self, check_results: bool = False) -> bool:
        """Cancels or stops a job in Galaxy."""
        self.url = None
        self.status.state = WorkState.STOPPING
        if check_results:
            response = self.galaxy_instance.make_get_request(
                f"{self.store.nova_connection.galaxy_url}/api/jobs{self.id}/finish"
            response = self.galaxy_instance.make_put_request(
                f"{self.store.nova_connection.galaxy_url}/api/jobs/{self.id}/finish"
            )
            if response.status_code == 200:
                self.status.state = WorkState.FINISHED
            if response:
                return True
            else:
                self.status.state = WorkState.FINISHED
                self.status.error_msg = response.text
                self.status.error_msg = "could not stop job"
                return False
        try:
            success = self.galaxy_instance.jobs.cancel_job(self.id)
            return success
        except Exception:
            return False
        self.status.state = WorkState.ERROR
        return self.galaxy_instance.jobs.cancel_job(self.id)

    def join_job_thread(self) -> None:
        if self.thread:
@@ -133,7 +175,7 @@ class Job:

    def wait_for_results(self, timeout: float = 12000) -> None:
        """Wait for job to finish."""
        self.galaxy_instance.jobs.wait_for_job(self.id, maxwait=timeout)
        self.galaxy_instance.jobs.wait_for_job(self.id, maxwait=timeout, check=True)

    def get_state(self) -> JobStatus:
        """Returns current state of job."""
+2 −0
Original line number Diff line number Diff line
@@ -13,3 +13,5 @@ class WorkState(Enum):
    FINISHED = "finished"
    ERROR = "error"
    DELETED = "deleted"
    CANCELED = "canceled"
    STOPPING = "stopping"
Loading