Unverified Commit fd71fbeb authored by Gregory Cage's avatar Gregory Cage Committed by GitHub
Browse files

Merge pull request #41 from nova-model/process-stop-signal

Add global cleanup and job fetching as well as signals to invoke them
parents 1c8ed34c 03dc752a
Loading
Loading
Loading
Loading
+12 −10
Original line number Diff line number Diff line
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.

[[package]]
name = "aiohappyeyeballs"
@@ -149,7 +149,7 @@ version = "0.7.0"
description = "Reusable constraint types to use with typing.Annotated"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
groups = ["main", "dev"]
files = [
    {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"},
    {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"},
@@ -162,7 +162,7 @@ description = "Timeout context manager for asyncio programs"
optional = false
python-versions = ">=3.8"
groups = ["main"]
markers = "python_version == \"3.10\""
markers = "python_version < \"3.11\""
files = [
    {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"},
    {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"},
@@ -543,7 +543,7 @@ description = "Backport of PEP 654 (exception groups)"
optional = false
python-versions = ">=3.7"
groups = ["main", "dev"]
markers = "python_version == \"3.10\""
markers = "python_version < \"3.11\""
files = [
    {file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"},
    {file = "exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88"},
@@ -1057,15 +1057,18 @@ files = [

[[package]]
name = "nova-common"
version = "0.2.0"
version = "0.2.3"
description = "NOVA Common Project"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
    {file = "nova_common-0.2.0-py3-none-any.whl", hash = "sha256:7b73d14eeff5a7f22844578eeaf6b30a5f0c5197bd0908e92d5cb1dc425bbc6d"},
    {file = "nova_common-0.2.3-py3-none-any.whl", hash = "sha256:68089eee23e1dd292639d017bdd46aa556ee11793cff9aef8f11c470b46ff683"},
]

[package.dependencies]
pydantic = ">=2.11.4,<3.0.0"

[[package]]
name = "packaging"
version = "25.0"
@@ -1307,7 +1310,7 @@ version = "2.11.4"
description = "Data validation using Python type hints"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
groups = ["main", "dev"]
files = [
    {file = "pydantic-2.11.4-py3-none-any.whl", hash = "sha256:d9615eaa9ac5a063471da949c8fc16376a84afb5024688b3ff885693506764eb"},
    {file = "pydantic-2.11.4.tar.gz", hash = "sha256:32738d19d63a226a52eed76645a98ee07c1f410ee41d93b4afbfa85ed8111c2d"},
@@ -1329,7 +1332,7 @@ version = "2.33.2"
description = "Core functionality for Pydantic validation and serialization"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
groups = ["main", "dev"]
files = [
    {file = "pydantic_core-2.33.2-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:2b3d326aaef0c0399d9afffeb6367d5e26ddc24d351dbc9c636840ac355dc5d8"},
    {file = "pydantic_core-2.33.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0e5b2671f05ba48b94cb90ce55d8bdcaaedb8ba00cc5359f6810fc918713983d"},
@@ -1959,7 +1962,6 @@ files = [
    {file = "typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c"},
    {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"},
]
markers = {main = "python_version == \"3.10\""}

[[package]]
name = "typing-inspection"
@@ -1967,7 +1969,7 @@ version = "0.4.0"
description = "Runtime typing introspection tools"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
groups = ["main", "dev"]
files = [
    {file = "typing_inspection-0.4.0-py3-none-any.whl", hash = "sha256:50e72559fcd2a6367a19f7a7e610e6afcb9fac940c650290eed893d61386832f"},
    {file = "typing_inspection-0.4.0.tar.gz", hash = "sha256:9765c87de36671694a67904bf2c96e395be9c6439bb6c87b5142569dcdd65122"},
+36 −3
Original line number Diff line number Diff line
@@ -8,6 +8,26 @@ from deprecated import deprecated
from .data_store import Datastore
from .tool import stop_all_tools_in_store

_global_connections: List = []


def global_cleanup(*args: Any, **kwargs: Any) -> None:
    """Stop all tools in all data stores."""
    global _global_connections
    for conn in _global_connections.copy():
        conn.close(force_stop=True)
    _global_connections = []


def global_get_running_tools(*args: Any, **kwargs: Any) -> List:
    """Get all running tools in all data stores."""
    global _global_connections
    tools: List = []
    for conn in _global_connections:
        for store in conn.datastores:
            tools += store.recover_tools()
    return tools


class GalaxyConnectionError(Exception):
    """Exception raised for errors in the connection.
@@ -89,15 +109,26 @@ class ConnectionHelper:
        store: Datastore
            The data store to remove from this connection.
        """
        if not store.persist_store:
            store.cleanup()
        self.datastores.remove(store)

    def close(self) -> None:
    def close(self, force_stop: bool = False) -> None:
        """Closes the connection and stops all jobs in non-persisted data stores.

        Parameters
        ----------
        force_stop: bool
            Force data stores to stop currently running jobs even persisted stores. Will not delete persisted stores.

        """
        global _global_connections
        # Remove all data stores after execution
        for store in self.datastores:
            if not store.persist_store:
            if not store.persist_store or force_stop:
                stop_all_tools_in_store(store)
                self.remove_data_store(store)
        _global_connections.remove(self)


class Connection:
@@ -144,6 +175,8 @@ class Connection:
        ------
            ValueError: If the Galaxy URL or API key is not provided.
        """
        global _global_connections
        self._init_galaxy_instance()
        conn = ConnectionHelper(self.galaxy_instance, self.galaxy_url)
        _global_connections.append(conn)
        return conn
+2 −2
Original line number Diff line number Diff line
"""Abstract interfaces and type definitions."""

from abc import ABC, abstractmethod
from typing import Tuple
from typing import Optional, Tuple

from nova.galaxy.data_store import Datastore
from nova.galaxy.parameters import Parameters
@@ -22,7 +22,7 @@ class BasicTool(ABC):
        self.store = store

    @abstractmethod
    def prepare_tool(self) -> Tuple[Tool, Parameters]:
    def prepare_tool(self) -> Tuple[Tool, Optional[Parameters]]:
        """Prepare tool to run."""
        raise Exception("Please implement in a concrete class")

+1 −1
Original line number Diff line number Diff line
@@ -149,7 +149,7 @@ class Job:

        # Set Tool Inputs
        tool_inputs = galaxy.tools.inputs.inputs()
        if params:
        if params and len(params.inputs) > 0:
            for param, val in params.inputs.items():
                if isinstance(val, Dataset):
                    datasets_to_upload[param] = val
+5 −0
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ from blinker import signal
from nova.common.job import ToolOutputs, WorkState
from nova.common.signals import Signal, ToolCommand, get_signal_id
from nova.galaxy import Connection, Tool
from nova.galaxy.connection import global_cleanup, global_get_running_tools
from nova.galaxy.interfaces import BasicTool
from nova.galaxy.job import JobStatus

@@ -65,6 +66,10 @@ class ToolRunner:
        self.error_message_signal = signal(get_signal_id(id, Signal.ERROR_MESSAGE))
        self.execution_signal = signal(get_signal_id(id, Signal.TOOL_COMMAND))
        self.outputs_signal = signal(get_signal_id(id, Signal.OUTPUTS))
        self.kill_on_exit_signal = signal(Signal.EXIT_SIGNAL)
        self.kill_on_exit_signal.connect(global_cleanup, weak=False)
        self.fetch_all_jobs_signal = signal(Signal.GET_ALL_TOOLS)
        self.fetch_all_jobs_signal.connect(global_get_running_tools, weak=False)

        self.error: str = ""
        self.loop: Optional[asyncio.AbstractEventLoop] = None
Loading