Loading src/nova/galaxy/job.py 0 → 100644 +36 −0 Original line number Diff line number Diff line """ Job """ from .data_store import Datastore from .parameters import Parameters from .tool import AbstractWork class Job: def __init__(self, work: AbstractWork): pass def submit_job(self, datastore: Datastore, params: Parameters) -> None: pass def cancel_job(self) -> None: pass def wait_for_job(self) -> None: pass def get_state(self) -> None: pass def get_outputs(self) -> None: pass def get_stdout(self) -> str: pass def get_stderr(self) -> str: pass def get_results(self) -> None: pass src/nova/galaxy/nova.py +3 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,9 @@ class NovaConnection: history = self.galaxy_instance.histories.get_histories(name=store.name)[0]["id"] self.galaxy_instance.histories.delete_history(history_id=history, purge=True) def get_status(self, tool): pass class Nova: """ Loading src/nova/galaxy/tool.py +11 −0 Original line number Diff line number Diff line """Contains classes to run tools in Galaxy via Nova.""" import time from enum import Enum from typing import List, Optional, Union from bioblend import galaxy Loading @@ -11,6 +12,16 @@ from .outputs import Outputs from .parameters import Parameters class WorkState(Enum): """The state of a dataset in Galaxy.""" NOT_STARTED = 1 QUEUED = 2 RUNNING = 3 FINISHED = 4 ERROR = 5 class AbstractWork: """Abstraction for a runnable object in Galaxy such as a tool or workflow.""" Loading tests/test_job.py 0 → 100644 +49 −0 Original line number Diff line number Diff line from nova.galaxy.tool import Tool, WorkState from nova.galaxy.parameters import Parameters TEST_INT_TOOL_ID = "interactive_tool_generic_output" def test_status(nova_instance, galaxy_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() state = test_tool.status() assert state == WorkState.NOT_STARTED link = test_tool.run_interactive(data_store=store, params=params, check_url=False) state = test_tool.status() # state = connection.get_status(test_tool) assert state == WorkState.RUNNING # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.stop() # galaxy_instance.make_get_request(f"{store.nova_connection.galaxy_url}/api/jobs/{test_tool.job_id}/finish") # galaxy_instance.wait_for_job(test_tool.job_id) state = connection.get_status(test_tool) assert state == WorkState.FINISHED def test_cancel_tool(nova_instance, galaxy_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.cancel() # state = connection.get_status(test_tool) state = test_tool.status() assert state == WorkState.ERROR def test_get_tool_stdout(nova_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff state = test_tool.status() # state = connection.get_status(test_tool) assert state == WorkState.RUNNING stdout = test_tool.get_stdout() assert stdout is not None # TODO maybe check specific stdout here Loading
src/nova/galaxy/job.py 0 → 100644 +36 −0 Original line number Diff line number Diff line """ Job """ from .data_store import Datastore from .parameters import Parameters from .tool import AbstractWork class Job: def __init__(self, work: AbstractWork): pass def submit_job(self, datastore: Datastore, params: Parameters) -> None: pass def cancel_job(self) -> None: pass def wait_for_job(self) -> None: pass def get_state(self) -> None: pass def get_outputs(self) -> None: pass def get_stdout(self) -> str: pass def get_stderr(self) -> str: pass def get_results(self) -> None: pass
src/nova/galaxy/nova.py +3 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,9 @@ class NovaConnection: history = self.galaxy_instance.histories.get_histories(name=store.name)[0]["id"] self.galaxy_instance.histories.delete_history(history_id=history, purge=True) def get_status(self, tool): pass class Nova: """ Loading
src/nova/galaxy/tool.py +11 −0 Original line number Diff line number Diff line """Contains classes to run tools in Galaxy via Nova.""" import time from enum import Enum from typing import List, Optional, Union from bioblend import galaxy Loading @@ -11,6 +12,16 @@ from .outputs import Outputs from .parameters import Parameters class WorkState(Enum): """The state of a dataset in Galaxy.""" NOT_STARTED = 1 QUEUED = 2 RUNNING = 3 FINISHED = 4 ERROR = 5 class AbstractWork: """Abstraction for a runnable object in Galaxy such as a tool or workflow.""" Loading
tests/test_job.py 0 → 100644 +49 −0 Original line number Diff line number Diff line from nova.galaxy.tool import Tool, WorkState from nova.galaxy.parameters import Parameters TEST_INT_TOOL_ID = "interactive_tool_generic_output" def test_status(nova_instance, galaxy_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() state = test_tool.status() assert state == WorkState.NOT_STARTED link = test_tool.run_interactive(data_store=store, params=params, check_url=False) state = test_tool.status() # state = connection.get_status(test_tool) assert state == WorkState.RUNNING # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.stop() # galaxy_instance.make_get_request(f"{store.nova_connection.galaxy_url}/api/jobs/{test_tool.job_id}/finish") # galaxy_instance.wait_for_job(test_tool.job_id) state = connection.get_status(test_tool) assert state == WorkState.FINISHED def test_cancel_tool(nova_instance, galaxy_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.cancel() # state = connection.get_status(test_tool) state = test_tool.status() assert state == WorkState.ERROR def test_get_tool_stdout(nova_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff state = test_tool.status() # state = connection.get_status(test_tool) assert state == WorkState.RUNNING stdout = test_tool.get_stdout() assert stdout is not None # TODO maybe check specific stdout here