Commit c4ad84c4 authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Merge branch '18-job-status' into 'main'

Implement back end infrastructure for managing tool execution.

Closes #18

See merge request ndip/public-packages/nova-galaxy!8
parents 098203ce e6a405b2
Loading
Loading
Loading
Loading
Loading

src/nova/galaxy/job.py

0 → 100644
+191 −0
Original line number Diff line number Diff line
"""Internal job related classes and functions."""

import sys
import time
from threading import Thread
from typing import Dict, Optional

from bioblend import galaxy

from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .outputs import Outputs
from .parameters import Parameters
from .util import WorkState


class JobStatus:
    """Internal structure to hold job status info."""

    def __init__(self) -> None:
        self.state = WorkState.NOT_STARTED
        self.details = ""
        self.error_msg = ""


class Job:
    """Internal class managing Galaxy job execution. Should not be used by end users."""

    def __init__(self, tool_id: str, data_store: Datastore) -> None:
        self.id = ""
        self.datasets = None
        self.collections = None
        self.tool = tool_id
        self.store = data_store
        self.galaxy_instance = self.store.nova_connection.galaxy_instance
        self.status = JobStatus()
        self.url: Optional[str] = None

    def _run_and_wait(self, params: Parameters) -> None:
        """Runs tools and waits for result."""
        self.submit(params)
        try:
            self.wait_for_results()
        except Exception:
            self.url = None
            self.status.state = WorkState.ERROR

        self.status.state = WorkState.FINISHED

    def run(self, params: Parameters, wait: bool) -> Optional[Outputs]:
        """Runs a job in Galaxy."""
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR]:
            thread = Thread(target=self._run_and_wait, args=(params,))
            thread.start()
            if wait:
                thread.join()
                return self.get_results()
            return None
        else:
            raise Exception(f"Tool {self.tool} (id: {self.id}) is already running.")

    def run_interactive(
        self, params: Parameters, wait: bool, max_tries: int = 100, check_url: bool = True
    ) -> Optional[str]:
        """Runs an interactive tool in Galaxy and returns a link to the tool."""
        self.run(params, False)
        if not wait:
            return None
        successful_url = self.get_url(max_tries=max_tries, check_url=check_url)
        if successful_url:
            return successful_url
        # If successful_url is None, then there was an issue starting the interactive tool.
        status = self.cancel()
        # if status is false, the job has been in a terminal state already, indicating an error somewhere in execution.
        if status:
            raise Exception(
                "Unable to fetch the URL for interactive tool. This could be due to needing to pull the docker image. "
                "Try again with a larger 'max_tries' value."
            )
        else:
            raise Exception("Interactive tool was stopped unexpectedly.")

    def submit(self, params: Parameters) -> None:
        """Handles uploading inputs and submitting job."""
        self.status.state = WorkState.UPLOADING_DATA
        self.url = None
        datasets_to_upload = {}

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
            else:
                tool_inputs.set_param(param, val)
        ids = upload_datasets(store=self.store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        self.status.state = WorkState.QUEUED
        results = self.galaxy_instance.tools.run_tool(
            history_id=self.store.history_id, tool_id=self.tool, tool_inputs=tool_inputs
        )
        self.id = results["jobs"][0]["id"]
        self.datasets = results["outputs"]
        self.collections = results["output_collections"]

    def cancel(self, check_results: bool = False) -> bool:
        """Cancels or stops a job in Galaxy."""
        self.url = None
        if check_results:
            response = self.galaxy_instance.make_get_request(
                f"{self.store.nova_connection.galaxy_url}/api/jobs{self.id}/finish"
            )
            if response.status_code == 200:
                self.status.state = WorkState.FINISHED
                return True
            else:
                self.status.state = WorkState.FINISHED
                self.status.error_msg = response.text
                return False
        self.status.state = WorkState.ERROR
        return self.galaxy_instance.jobs.cancel_job(self.id)

    def wait_for_results(self) -> None:
        """Wait for job to finish."""
        self.galaxy_instance.jobs.wait_for_job(self.id)

    def get_state(self) -> JobStatus:
        """Returns current state of job."""
        if self.status.state == WorkState.QUEUED:
            job = self.galaxy_instance.jobs.show_job(self.id)
            if job["state"] == "running":
                self.status.state = WorkState.RUNNING
        return self.status

    def get_results(self) -> Outputs:
        """Return results from finished job."""
        if self.status.state == WorkState.FINISHED:
            outputs = Outputs()
            if self.datasets:
                for dataset in self.datasets:
                    d = Dataset(dataset["output_name"])
                    d.id = dataset["id"]
                    d.store = self.store
                    outputs.add_output(d)
            if self.collections:
                for collection in self.collections:
                    dc = DatasetCollection(collection["output_name"])
                    dc.id = collection["id"]
                    dc.store = self.store
                    outputs.add_output(dc)

            return outputs
        else:
            raise Exception(f"Job {self.id} has not finished running.")

    def get_url(self, max_tries: int = 100, check_url: bool = True) -> Optional[str]:
        """Get the URL or endpoint for this tool."""
        if self.url:
            return self.url
        timer = max_tries
        while timer > 0:
            try:
                entry_points = self.galaxy_instance.make_get_request(
                    f"{self.store.nova_connection.galaxy_url}/api/entry_points?job_id={self.id}"
                )
                for ep in entry_points.json():
                    if ep["job_id"] == self.id and ep.get("target", None):
                        url = f"{self.store.nova_connection.galaxy_url}{ep['target']}"
                        self.url = url
                        response = self.galaxy_instance.make_get_request(url)
                        if response.status_code == 200 or not check_url:
                            return url
            except Exception:
                continue
            finally:
                timer -= 1
                time.sleep(1)
        return None

    def get_console_output(self) -> Dict[str, str]:
        """Get all the current console output."""
        out = self.galaxy_instance.make_get_request(
            f"{self.store.nova_connection.galaxy_url}/api/jobs/"
            f"{self.id}/console_output?stdout_position=0&stdout_length="
            f"{sys.maxsize - 1}&stderr_position=0&stderr_length={sys.maxsize - 1}"
        )
        out.raise_for_status()
        return out.json()
+109 −88
Original line number Diff line number Diff line
"""Contains classes to run tools in Galaxy via Nova."""

import time
from typing import List, Optional, Union

from bioblend import galaxy

from .data_store import Datastore
from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .dataset import AbstractData
from .job import Job
from .outputs import Outputs
from .parameters import Parameters
from .util import WorkState


class AbstractWork:
@@ -23,105 +22,127 @@ class AbstractWork:
    def get_inputs(self) -> List[Parameters]:
        return []

    def run(self, data_store: Datastore, params: Parameters) -> Union[Outputs, None]:
    def run(self, data_store: Datastore, params: Parameters, wait: bool) -> Union[Outputs, None]:
        return None


class Tool(AbstractWork):
    """Represents a tool from Galaxy that can be run."""
    """Represents a tool from Galaxy that can be run.

    It's recommended to create a new Tool object every time you want to run a tool to prevent results from being
    overridden.

    """

    def __init__(self, id: str):
        super().__init__(id)
        self._job: Optional[Job] = None

    def run(self, data_store: Datastore, params: Parameters) -> Outputs:
        """Runs this tool in a blocking manner and returns a map of the output datasets and collections."""
        outputs = Outputs()
        galaxy_instance = data_store.nova_connection.galaxy_instance
        datasets_to_upload = {}
    def run(self, data_store: Datastore, params: Parameters, wait: bool = True) -> Optional[Outputs]:
        """Run this tool.

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
            else:
                tool_inputs.set_param(param, val)

        ids = upload_datasets(store=data_store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        results = galaxy_instance.tools.run_tool(
            history_id=data_store.history_id, tool_id=self.id, tool_inputs=tool_inputs
        )

        for job in results["jobs"]:
            galaxy_instance.jobs.wait_for_job(job_id=job["id"])

        # Collect output datasets and dataset collections
        result_datasets = results["outputs"]
        result_collections = results["output_collections"]
        if result_datasets:
            for dataset in result_datasets:
                d = Dataset(dataset["output_name"])
                d.id = dataset["id"]
                d.store = data_store
                outputs.add_output(d)
        if result_collections:
            for collection in result_collections:
                dc = DatasetCollection(collection["output_name"])
                dc.id = collection["id"]
                dc.store = data_store
                outputs.add_output(dc)

        return outputs
        By default, will be run in a blocking manner, unless `wait` is set to False. Will return the
        results as an instance of the `Outputs` class from nova.galaxy.outputs if run in a blocking way. Otherwise, will
        return None, and the user will be responsible for getting results by calling `get_results`.

        Parameters
        ----------
        data_store: Datastore
            The data store to run this tool in.
        params: Parameters
            The input parameters for this tool.
        wait: bool
            Whether to run this tool in a blocking manner (True) or not (False). Default is True.

        Returns
        -------
        Optional[Outputs]
            If run in a blocking manner, returns the Outputs once the tool is finished running. Otherwise, returns None.

        """
        self._job = Job(self.id, data_store)
        return self._job.run(params, wait)

    def run_interactive(
        self, data_store: Datastore, params: Parameters, max_tries: int = 100, check_url: bool = True
        self, data_store: Datastore, params: Parameters, wait: bool = True, max_tries: int = 100, check_url: bool = True
    ) -> Optional[str]:
        galaxy_instance = data_store.nova_connection.galaxy_instance
        datasets_to_upload = {}
        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
        """Run tool interactively.

        Interactive Tools typically are run exclusively with this method. Can poll for
        the interactive tool endpoint before returning, ensuring that the tool is reachable.

        Parameters
        ----------
        data_store: Datastore
            The data store to run this tool in.
        params: Parameters
            The input parameters for this tool.
        wait: bool
            Whether to wait for the interactive tool to start up before returning.
        max_tries: int
            Timeout for how long to poll for the interactive tool endpoint.
        check_url:
            Whether to check if the interactive tool endpoint is reachable before returning.

        Returns
        -------
        Optional[str]
            Will return None, if not waiting for interactive tool to startup with `wait` parameter. Will return
            the URL to the interactive tool otherwise.

        """
        self._job = Job(self.id, data_store)
        return self._job.run_interactive(params, wait=wait, max_tries=max_tries, check_url=check_url)

    def get_status(self) -> WorkState:
        """Returns the current status of the tool."""
        if self._job:
            return self._job.get_state().state
        else:
                tool_inputs.set_param(param, val)

        ids = upload_datasets(store=data_store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        results = galaxy_instance.tools.run_tool(
            history_id=data_store.history_id, tool_id=self.id, tool_inputs=tool_inputs
        )
        job_id = results["jobs"][0]["id"]

        timer = max_tries
        while timer > 0:
            entry_points = galaxy_instance.make_get_request(
                f"{data_store.nova_connection.galaxy_url}/api/entry_points?job_id={job_id}"
            )
            for ep in entry_points.json():
                if ep["job_id"] == job_id and ep.get("target", None):
                    url = f"{data_store.nova_connection.galaxy_url}{ep['target']}"
                    response = galaxy_instance.make_get_request(url)
                    if response.status_code == 200 or not check_url:
                        return url
            timer -= 1
            time.sleep(1)
        status = galaxy_instance.jobs.cancel_job(job_id)
        # if status is false, the job has been in a terminal state already, indicating an error somewhere in execution
        if status:
            raise Exception("Unable to fetch the URL for interactive tool.")
        else:
            raise Exception("Interactive tool was stopped unexpectedly.")
            return WorkState.NOT_STARTED

    def get_results(self) -> Optional[Outputs]:
        """Returns the results from running this tool.

        Throws an Exception if the tool has not finished yet. Will be
        overridden if this tool is run again.

        """
        if self._job:
            return self._job.get_results()
        return None

    def stop(self) -> None:
        """Stop the tool, but keep any existing results."""
        if self._job:
            self._job.cancel(check_results=True)

    def cancel(self) -> None:
        """Cancels the tool execution and gets rid of any results collected."""
        if self._job:
            self._job.cancel(check_results=False)

    def get_stdout(self) -> Optional[str]:
        """Get the current STDOUT for a tool. Will be overridden everytime this tool is run."""
        if self._job:
            return self._job.get_console_output()["stdout"]
        return None

    def get_stderr(self) -> Optional[str]:
        """Get the current STDERR for a tool. Will be overridden everytime this tool is run."""
        if self._job:
            return self._job.get_console_output()["stderr"]
        return None

    def get_url(self) -> Optional[str]:
        """Get the URL for this tool. If this is an interactive tool, then will return the endpoint to the tool."""
        if self._job:
            return self._job.get_url()
        return None


def stop_all_tools_in_store(data_store: Datastore) -> None:
    """Stops all the tools from running in a particular store."""
    galaxy_instance = data_store.nova_connection.galaxy_instance
    jobs = galaxy_instance.jobs.get_jobs(history_id=data_store.history_id)
    for job in jobs:
+14 −0
Original line number Diff line number Diff line
"""Utilities."""

from enum import Enum


class WorkState(Enum):
    """The state of a tool in Galaxy."""

    NOT_STARTED = 1
    UPLOADING_DATA = 2
    QUEUED = 3
    RUNNING = 4
    FINISHED = 5
    ERROR = 6
+44 −0
Original line number Diff line number Diff line
"""Tests for tools."""

import time

from bioblend.galaxy import GalaxyInstance
from bioblend.galaxy.datasets import DatasetClient

@@ -7,6 +9,7 @@ from nova.galaxy.dataset import Dataset
from nova.galaxy.nova import Nova
from nova.galaxy.parameters import Parameters
from nova.galaxy.tool import Tool
from nova.galaxy.util import WorkState

TEST_TOOL_ID = "neutrons_remote_command"
TEST_INT_TOOL_ID = "interactive_tool_generic_output"
@@ -33,6 +36,7 @@ def test_run_tool_interactive(nova_instance: Nova, galaxy_instance: GalaxyInstan
        params.add_input("run_it", True)
        link = test_tool.run_interactive(data_store=store, params=params, check_url=False)
        assert link is not None
        assert test_tool.get_url() is not None
        entry_points = galaxy_instance.make_get_request(
            f"{store.nova_connection.galaxy_url}/api/entry_points?running=true"
        )
@@ -53,3 +57,43 @@ def test_run_tool_interactive(nova_instance: Nova, galaxy_instance: GalaxyInstan
                    assert test_text == "this is a test"
                    return
        raise Exception("Did not find interactive tool while testing.")


def test_status(nova_instance: Nova) -> None:
    with nova_instance.connect() as connection:
        store = connection.create_data_store(name="nova_galaxy_testing")
        test_tool = Tool(TEST_INT_TOOL_ID)
        params = Parameters()
        state = test_tool.get_status()
        assert state == WorkState.NOT_STARTED
        test_tool.run_interactive(data_store=store, params=params, check_url=False)
        state = test_tool.get_status()
        assert state == WorkState.RUNNING
        test_tool.stop()
        state = test_tool.get_status()
        assert state == WorkState.FINISHED


def test_cancel_tool(nova_instance: Nova) -> None:
    with nova_instance.connect() as connection:
        store = connection.create_data_store(name="nova_galaxy_testing")
        test_tool = Tool(TEST_INT_TOOL_ID)
        params = Parameters()
        test_tool.run_interactive(data_store=store, params=params, check_url=False)
        test_tool.cancel()
        state = test_tool.get_status()
        assert state == WorkState.ERROR


def test_get_tool_stdout(nova_instance: Nova) -> None:
    with nova_instance.connect() as connection:
        store = connection.create_data_store(name="nova_galaxy_testing")
        test_tool = Tool(TEST_INT_TOOL_ID)
        params = Parameters()
        test_tool.run_interactive(data_store=store, params=params, check_url=False)
        state = test_tool.get_status()
        assert state == WorkState.RUNNING
        time.sleep(10)  # Tool takes a moment to produce stdout
        stdout = test_tool.get_stdout()
        assert stdout is not None
        test_tool.cancel()