Commit 9110af71 authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Implement backend job management and refactor Tool class and tests

parent 5f5c43a3
Loading
Loading
Loading
Loading
Loading
+158 −22
Original line number Diff line number Diff line
"""
Job
"""
"""Internal job related classes and functions."""

import sys
import time
from threading import Thread
from typing import Dict, Optional

from bioblend import galaxy

from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .outputs import Outputs
from .parameters import Parameters
from .tool import AbstractWork
from .util import WorkState


class JobStatus:
    """Internal structure to hold job status info."""

    def __init__(self) -> None:
        self.state = WorkState.NOT_STARTED
        self.details = ""
        self.error_msg = ""


class Job:
    def __init__(self, work: AbstractWork):
    """Internal class managing Galaxy job execution. Should not be used by end users."""

    def __init__(self, tool_id: str, data_store: Datastore) -> None:
        self.id = ""
        pass
        self.datasets = None
        self.collections = None
        self.tool = tool_id
        self.store = data_store
        self.galaxy_instance = self.store.nova_connection.galaxy_instance
        self.status = JobStatus()

    def submit_job(self, datastore: Datastore, params: Parameters) -> None:
        pass
    def _run_and_wait(self, params: Parameters) -> None:
        """Runs tools and waits for result."""
        self.submit(params)
        try:
            self.wait_for_results()
        except Exception:
            self.status.state = WorkState.ERROR

    def cancel_job(self) -> None:
        pass
        self.status.state = WorkState.FINISHED

    def wait_for_results(self) -> None:
        pass
    def run(self, params: Parameters, wait: bool) -> Optional[Outputs]:
        """Runs a job in Galaxy."""
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR]:
            thread = Thread(target=self._run_and_wait, args=(params,))
            thread.start()
            if wait:
                thread.join()
                return self.get_results()
            return None
        else:
            raise Exception(f"Tool {self.tool} (id: {self.id}) is already running.")

    def get_state(self) -> None:
        pass
    def run_interactive(
        self, params: Parameters, wait: bool, max_tries: int = 100, check_url: bool = True
    ) -> Optional[str]:
        """Runs an interactive tool in Galaxy and returns a link to the tool."""
        self.run(params, False)
        if not wait:
            return None
        timer = max_tries
        while timer > 0:
            try:
                entry_points = self.galaxy_instance.make_get_request(
                    f"{self.store.nova_connection.galaxy_url}/api/entry_points?job_id={self.id}"
                )
                for ep in entry_points.json():
                    if ep["job_id"] == self.id and ep.get("target", None):
                        url = f"{self.store.nova_connection.galaxy_url}{ep['target']}"
                        response = self.galaxy_instance.make_get_request(url)
                        if response.status_code == 200 or not check_url:
                            return url
            except Exception:
                continue
            finally:
                timer -= 1
                time.sleep(1)
        status = self.cancel()
        # if status is false, the job has been in a terminal state already, indicating an error somewhere in execution
        if status:
            raise Exception("Unable to fetch the URL for interactive tool.")
        else:
            raise Exception("Interactive tool was stopped unexpectedly.")

    def submit(self, params: Parameters) -> None:
        """Handles uploading inputs and submitting job."""
        self.status.state = WorkState.UPLOADING_DATA
        datasets_to_upload = {}

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
            else:
                tool_inputs.set_param(param, val)
        ids = upload_datasets(store=self.store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        self.status.state = WorkState.QUEUED
        results = self.galaxy_instance.tools.run_tool(
            history_id=self.store.history_id, tool_id=self.tool, tool_inputs=tool_inputs
        )
        self.id = results["jobs"][0]["id"]
        self.datasets = results["outputs"]
        self.collections = results["output_collections"]

    def cancel(self, check_results: bool = False) -> bool:
        """Cancels or stops a job in Galaxy."""
        if check_results:
            response = self.galaxy_instance.make_get_request(
                f"{self.store.nova_connection.galaxy_url}/api/jobs{self.id}/finish"
            )
            if response.status_code == 200:
                self.status.state = WorkState.FINISHED
                return True
            else:
                self.status.state = WorkState.FINISHED
                self.status.error_msg = response.text
                return False
        self.status.state = WorkState.ERROR
        return self.galaxy_instance.jobs.cancel_job(self.id)

    def wait_for_results(self) -> None:
        """Wait for job to finish."""
        self.galaxy_instance.jobs.wait_for_job(self.id)

    def get_outputs(self) -> None:
        pass
    def get_state(self) -> JobStatus:
        """Returns current state of job."""
        if self.status.state == WorkState.QUEUED:
            job = self.galaxy_instance.jobs.show_job(self.id)
            if job["state"] == "running":
                self.status.state = WorkState.RUNNING
        return self.status

    def get_stdout(self) -> str:
        pass
    def get_results(self) -> Outputs:
        """Return results from finished job."""
        if self.status.state == WorkState.FINISHED:
            outputs = Outputs()
            if self.datasets:
                for dataset in self.datasets:
                    d = Dataset(dataset["output_name"])
                    d.id = dataset["id"]
                    d.store = self.store
                    outputs.add_output(d)
            if self.collections:
                for collection in self.collections:
                    dc = DatasetCollection(collection["output_name"])
                    dc.id = collection["id"]
                    dc.store = self.store
                    outputs.add_output(dc)

    def get_stderr(self) -> str:
        pass
            return outputs
        else:
            raise Exception(f"Job {self.id} has not finished running.")

    def get_results(self) -> None:
        pass
    def get_console_output(self) -> Dict[str, str]:
        """Get all the current console output."""
        out = self.galaxy_instance.make_get_request(
            f"{self.store.nova_connection.galaxy_url}/api/jobs/"
            f"{self.id}/console_output?stdout_position=0&stdout_length="
            f"{sys.maxsize - 1}&stderr_position=0&stderr_length={sys.maxsize - 1}"
        )
        out.raise_for_status()
        return out.json()
+0 −3
Original line number Diff line number Diff line
@@ -51,9 +51,6 @@ class NovaConnection:
        history = self.galaxy_instance.histories.get_histories(name=store.name)[0]["id"]
        self.galaxy_instance.histories.delete_history(history_id=history, purge=True)

    def get_status(self, tool):
        pass


class Nova:
    """
+99 −119
Original line number Diff line number Diff line
"""Contains classes to run tools in Galaxy via Nova."""

import time
from enum import Enum
from typing import List, Optional, Union

from bioblend import galaxy

from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .dataset import AbstractData
from .job import Job
from .outputs import Outputs
from .parameters import Parameters


class WorkState(Enum):
    """The state of a dataset in Galaxy."""

    NOT_STARTED = 1
    QUEUED = 2
    UPLOADING_DATA = 6
    RUNNING = 3
    FINISHED = 4
    ERROR = 5
from .util import WorkState


class AbstractWork:
@@ -35,128 +22,121 @@ class AbstractWork:
    def get_inputs(self) -> List[Parameters]:
        return []

    def run(self, data_store: Datastore, params: Parameters) -> Union[Outputs, None]:
    def run(self, data_store: Datastore, params: Parameters, wait: bool) -> Union[Outputs, None]:
        return None


class Tool(AbstractWork):
    """Represents a tool from Galaxy that can be run."""
    """Represents a tool from Galaxy that can be run.

    It's recommended to create a new Tool object every time you want to run a tool to prevent results from being
    overridden.

    """

    def __init__(self, id: str):
        super().__init__(id)
        self._job = None
        self._job: Optional[Job] = None

    def run_async(self, data_store: Datastore, params: Parameters, async_execution: bool):
        # self.thread  = threading.new_thread(run)
    def run(self, data_store: Datastore, params: Parameters, wait: bool = True) -> Optional[Outputs]:
        """Run this tool.

        pass
        By default, will be run in a blocking manner, unless `wait` is set to False. Will return the
        results as an instance of the `Outputs` class from nova.galaxy.outputs if run in a blocking way. Otherwise, will
        return None, and the user will be responsible for getting results by calling `get_results`.

    def run(self, data_store: Datastore, params: Parameters, async_execution: bool) -> Outputs:
        """Runs this tool in a blocking manner and returns a map of the output datasets and collections."""
        # TODO Most of this logic will be moved to job class
        outputs = Outputs()
        galaxy_instance = data_store.nova_connection.galaxy_instance
        datasets_to_upload = {}
        Parameters
        ----------
        data_store: Datastore
            The data store to run this tool in.
        params: Parameters
            The input parameters for this tool.
        wait: bool
            Whether to run this tool in a blocking manner (True) or not (False). Default is True.

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
            else:
                tool_inputs.set_param(param, val)
        self.state = get_state()
        self._job.submit()
        ids = upload_datasets(store=data_store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        results = galaxy_instance.tools.run_tool(
            history_id=data_store.history_id, tool_id=self.id, tool_inputs=tool_inputs
        )

        for job in results["jobs"]:
            galaxy_instance.jobs.wait_for_job(job_id=job["id"])

        # Collect output datasets and dataset collections
        result_datasets = results["outputs"]
        result_collections = results["output_collections"]
        if result_datasets:
            for dataset in result_datasets:
                d = Dataset(dataset["output_name"])
                d.id = dataset["id"]
                d.store = data_store
                outputs.add_output(d)
        if result_collections:
            for collection in result_collections:
                dc = DatasetCollection(collection["output_name"])
                dc.id = collection["id"]
                dc.store = data_store
                outputs.add_output(dc)

        return outputs
        Returns
        -------
        Optional[Outputs]
            If run in a blocking manner, returns the Outputs once the tool is finished running. Otherwise, returns None.

        """
        self._job = Job(self.id, data_store)
        return self._job.run(params, wait)

    def run_interactive(
        self, data_store: Datastore, params: Parameters, max_tries: int = 100, check_url: bool = True
        self, data_store: Datastore, params: Parameters, wait: bool = True, max_tries: int = 100, check_url: bool = True
    ) -> Optional[str]:
        galaxy_instance = data_store.nova_connection.galaxy_instance
        datasets_to_upload = {}
        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
        """Run tool interactively.

        Interactive Tools typically are run exclusively with this method. Can poll for
        the interactive tool endpoint before returning, ensuring that the tool is reachable.

        Parameters
        ----------
        data_store: Datastore
            The data store to run this tool in.
        params: Parameters
            The input parameters for this tool.
        wait: bool
            Whether to wait for the interactive tool to start up before returning.
        max_tries: int
            Timeout for how long to poll for the interactive tool endpoint.
        check_url:
            Whether to check if the interactive tool endpoint is reachable before returning.

        Returns
        -------
        Optional[str]
            Will return None, if not waiting for interactive tool to startup with `wait` parameter. Will return
            the URL to the interactive tool otherwise.

        """
        self._job = Job(self.id, data_store)
        return self._job.run_interactive(params, wait=wait, max_tries=max_tries, check_url=check_url)

    def get_status(self) -> WorkState:
        """Returns the current status of the tool."""
        if self._job:
            return self._job.get_state().state
        else:
                tool_inputs.set_param(param, val)

        ids = upload_datasets(store=data_store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        results = galaxy_instance.tools.run_tool(
            history_id=data_store.history_id, tool_id=self.id, tool_inputs=tool_inputs
        )
        job_id = results["jobs"][0]["id"]

        timer = max_tries
        while timer > 0:
            entry_points = galaxy_instance.make_get_request(
                f"{data_store.nova_connection.galaxy_url}/api/entry_points?job_id={job_id}"
            )
            for ep in entry_points.json():
                if ep["job_id"] == job_id and ep.get("target", None):
                    url = f"{data_store.nova_connection.galaxy_url}{ep['target']}"
                    response = galaxy_instance.make_get_request(url)
                    if response.status_code == 200 or not check_url:
                        return url
            timer -= 1
            time.sleep(1)
        status = galaxy_instance.jobs.cancel_job(job_id)
        # if status is false, the job has been in a terminal state already, indicating an error somewhere in execution
        if status:
            raise Exception("Unable to fetch the URL for interactive tool.")
        else:
            raise Exception("Interactive tool was stopped unexpectedly.")
            return WorkState.NOT_STARTED

    def get_results(self) -> Optional[Outputs]:
        """Returns the results from running this tool.

    def get_status(self):
        return self._job.get_status()
        Throws an Exception if the tool has not finished yet. Will be
        overridden if this tool is run again.

    def get_results(self):
        pass
        """
        if self._job:
            return self._job.get_results()
        return None

    def stop(self) -> None:
        """Stop the tool, but keep any existing results."""
        if self._job:
            self._job.cancel(check_results=True)

    def stop(self):
        pass
    def cancel(self) -> None:
        """Cancels the tool execution and gets rid of any results collected."""
        if self._job:
            self._job.cancel(check_results=False)

    def cancel(self):
        pass
    def get_stdout(self) -> Optional[str]:
        """Get the current STDOUT for a tool. Will be overridden everytime this tool is run."""
        if self._job:
            return self._job.get_console_output()["stdout"]
        return None

    def get_stdout(self):
        pass
    def get_stderr(self) -> Optional[str]:
        """Get the current STDERR for a tool. Will be overridden everytime this tool is run."""
        if self._job:
            return self._job.get_console_output()["stderr"]
        return None


def stop_all_tools_in_store(data_store: Datastore) -> None:
    """Stops all the tools from running in a particular store."""
    galaxy_instance = data_store.nova_connection.galaxy_instance
    jobs = galaxy_instance.jobs.get_jobs(history_id=data_store.history_id)
    for job in jobs:
+14 −0
Original line number Diff line number Diff line
"""Utilities."""

from enum import Enum


class WorkState(Enum):
    """The state of a tool in Galaxy."""

    NOT_STARTED = 1
    UPLOADING_DATA = 2
    QUEUED = 3
    RUNNING = 4
    FINISHED = 5
    ERROR = 6

tests/test_job.py

deleted100644 → 0
+0 −43
Original line number Diff line number Diff line
from nova.galaxy.parameters import Parameters
from nova.galaxy.tool import Tool, WorkState

TEST_INT_TOOL_ID = "interactive_tool_generic_output"


def test_status(nova_instance, galaxy_instance):
    with nova_instance.connect() as connection:
        store = connection.create_data_store(name="nova_galaxy_testing")
        test_tool = Tool(TEST_INT_TOOL_ID)
        params = Parameters()
        state = test_tool.get_status()
        assert state == WorkState.NOT_STARTED
        link = test_tool.run_interactive(data_store=store, params=params, check_url=False)
        state = test_tool.get_status()
        test_tool.get_results()
        assert state == WorkState.RUNNING
        test_tool.stop()
        state = test_tool.get_status()
        assert state == WorkState.FINISHED


def test_cancel_tool(nova_instance, galaxy_instance):
    with nova_instance.connect() as connection:
        store = connection.create_data_store(name="nova_galaxy_testing")
        test_tool = Tool(TEST_INT_TOOL_ID)
        params = Parameters()
        link = test_tool.run_interactive(data_store=store, params=params, check_url=False)
        test_tool.cancel()
        state = test_tool.get_status()
        assert state == WorkState.ERROR


def test_get_tool_stdout(nova_instance):
    with nova_instance.connect() as connection:
        store = connection.create_data_store(name="nova_galaxy_testing")
        test_tool = Tool(TEST_INT_TOOL_ID)
        params = Parameters()
        link = test_tool.run_interactive(data_store=store, params=params, check_url=False)
        state = test_tool.get_status()
        assert state == WorkState.RUNNING
        stdout = test_tool.get_stdout()
        assert stdout is not None  # TODO maybe check specific stdout here
Loading