Commit 3dbcb509 authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Add interactive tool execution

parent 117cb5ce
Loading
Loading
Loading
Loading
Loading
+5 −2
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ from typing import Generator, List, Optional
from bioblend import galaxy

from .data_store import Datastore
from .tool import stop_all_tools_in_store


class GalaxyConnectionError(Exception):
@@ -28,8 +29,9 @@ class NovaConnection:
    be automatically purged after connection is closed, unless Datastore.persist() is called for that store.
    """

    def __init__(self, galaxy_instance: galaxy.GalaxyInstance):
    def __init__(self, galaxy_instance: galaxy.GalaxyInstance, galaxy_url: str):
        self.galaxy_instance = galaxy_instance
        self.galaxy_url = galaxy_url
        self.datastores: List[Datastore] = []

    def create_data_store(self, name: str) -> Datastore:
@@ -93,9 +95,10 @@ class Nova:
            raise ValueError("Galaxy URL must be a string")
        self.galaxy_instance = galaxy.GalaxyInstance(url=self.galaxy_url, key=self.galaxy_api_key)
        self.galaxy_instance.config.get_version()
        conn = NovaConnection(self.galaxy_instance)
        conn = NovaConnection(self.galaxy_instance, self.galaxy_url)
        yield conn
        # Remove all data stores after execution
        for store in conn.datastores:
            stop_all_tools_in_store(store)
            if not store.persist_store:
                conn.remove_data_store(store)
+46 −0
Original line number Diff line number Diff line
"""Contains classes to run tools in Galaxy via Nova."""

import time
from typing import List, Union

from bioblend import galaxy
@@ -75,3 +76,48 @@ class Tool(AbstractWork):
                outputs.add_output(dc)

        return outputs

    def run_interactive(self, data_store: Datastore, params: Parameters, max_tries: int = 100) -> str:
        galaxy_instance = data_store.nova_connection.galaxy_instance
        datasets_to_upload = {}
        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        for param, val in params.inputs.items():
            if isinstance(val, AbstractData):
                datasets_to_upload[param] = val
            else:
                tool_inputs.set_param(param, val)

        ids = upload_datasets(store=data_store, datasets=datasets_to_upload)
        for param, val in ids.items():
            tool_inputs.set_dataset_param(param, val)

        # Run tool and wait for job to finish
        results = galaxy_instance.tools.run_tool(
            history_id=data_store.history_id, tool_id=self.id, tool_inputs=tool_inputs
        )
        job_id = results["jobs"][0]["id"]

        timer = max_tries
        while timer > 0:
            entry_points = galaxy_instance.make_get_request(
                f"{data_store.nova_connection.galaxy_url}/api/entry_points?job_id={job_id}"
            )
            for ep in entry_points.json():
                if ep["job_id"] == job_id and ep.get("target", None):
                    return f"{data_store.nova_connection.galaxy_url}{ep['target']}"
            timer -= 1
            time.sleep(1)
        status = galaxy_instance.jobs.cancel_job(job_id)
        # if status is false, the job has been in a terminal state already, indicating an error somewhere in execution
        if status:
            raise Exception("Unable to fetch the URL for interactive tool.")
        else:
            raise Exception("Interactive tool was stopped unexpectedly.")


def stop_all_tools_in_store(data_store: Datastore) -> None:
    galaxy_instance = data_store.nova_connection.galaxy_instance
    jobs = galaxy_instance.jobs.get_jobs(history_id=data_store.history_id)
    for job in jobs:
        galaxy_instance.jobs.cancel_job(job["id"])