Loading src/nova/galaxy/job.py +2 −1 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ from .tool import AbstractWork class Job: def __init__(self, work: AbstractWork): self.id = "" pass def submit_job(self, datastore: Datastore, params: Parameters) -> None: Loading @@ -17,7 +18,7 @@ class Job: def cancel_job(self) -> None: pass def wait_for_job(self) -> None: def wait_for_results(self) -> None: pass def get_state(self) -> None: Loading src/nova/galaxy/tool.py +26 −2 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ class WorkState(Enum): NOT_STARTED = 1 QUEUED = 2 UPLOADING_DATA = 6 RUNNING = 3 FINISHED = 4 ERROR = 5 Loading @@ -43,9 +44,16 @@ class Tool(AbstractWork): def __init__(self, id: str): super().__init__(id) self._job = None def run(self, data_store: Datastore, params: Parameters) -> Outputs: def run_async(self, data_store: Datastore, params: Parameters, async_execution: bool): # self.thread = threading.new_thread(run) pass def run(self, data_store: Datastore, params: Parameters, async_execution: bool) -> Outputs: """Runs this tool in a blocking manner and returns a map of the output datasets and collections.""" # TODO Most of this logic will be moved to job class outputs = Outputs() galaxy_instance = data_store.nova_connection.galaxy_instance datasets_to_upload = {} Loading @@ -57,7 +65,8 @@ class Tool(AbstractWork): datasets_to_upload[param] = val else: tool_inputs.set_param(param, val) self.state = get_state() self._job.submit() ids = upload_datasets(store=data_store, datasets=datasets_to_upload) for param, val in ids.items(): tool_inputs.set_dataset_param(param, val) Loading Loading @@ -131,6 +140,21 @@ class Tool(AbstractWork): else: raise Exception("Interactive tool was stopped unexpectedly.") def get_status(self): return self._job.get_status() def get_results(self): pass def stop(self): pass def cancel(self): pass def get_stdout(self): pass def stop_all_tools_in_store(data_store: Datastore) -> None: galaxy_instance = data_store.nova_connection.galaxy_instance Loading tests/test_job.py +10 −16 Original line number Diff line number Diff line from nova.galaxy.tool import Tool, WorkState from nova.galaxy.parameters import Parameters from nova.galaxy.tool import Tool, WorkState TEST_INT_TOOL_ID = "interactive_tool_generic_output" def test_status(nova_instance, galaxy_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() state = test_tool.status() state = test_tool.get_status() assert state == WorkState.NOT_STARTED link = test_tool.run_interactive(data_store=store, params=params, check_url=False) state = test_tool.status() # state = connection.get_status(test_tool) state = test_tool.get_status() test_tool.get_results() assert state == WorkState.RUNNING # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.stop() # galaxy_instance.make_get_request(f"{store.nova_connection.galaxy_url}/api/jobs/{test_tool.job_id}/finish") # galaxy_instance.wait_for_job(test_tool.job_id) state = connection.get_status(test_tool) state = test_tool.get_status() assert state == WorkState.FINISHED Loading @@ -27,10 +26,8 @@ def test_cancel_tool(nova_instance, galaxy_instance): test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.cancel() # state = connection.get_status(test_tool) state = test_tool.status() state = test_tool.get_status() assert state == WorkState.ERROR Loading @@ -40,10 +37,7 @@ def test_get_tool_stdout(nova_instance): test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff state = test_tool.status() # state = connection.get_status(test_tool) state = test_tool.get_status() assert state == WorkState.RUNNING stdout = test_tool.get_stdout() assert stdout is not None # TODO maybe check specific stdout here Loading
src/nova/galaxy/job.py +2 −1 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ from .tool import AbstractWork class Job: def __init__(self, work: AbstractWork): self.id = "" pass def submit_job(self, datastore: Datastore, params: Parameters) -> None: Loading @@ -17,7 +18,7 @@ class Job: def cancel_job(self) -> None: pass def wait_for_job(self) -> None: def wait_for_results(self) -> None: pass def get_state(self) -> None: Loading
src/nova/galaxy/tool.py +26 −2 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ class WorkState(Enum): NOT_STARTED = 1 QUEUED = 2 UPLOADING_DATA = 6 RUNNING = 3 FINISHED = 4 ERROR = 5 Loading @@ -43,9 +44,16 @@ class Tool(AbstractWork): def __init__(self, id: str): super().__init__(id) self._job = None def run(self, data_store: Datastore, params: Parameters) -> Outputs: def run_async(self, data_store: Datastore, params: Parameters, async_execution: bool): # self.thread = threading.new_thread(run) pass def run(self, data_store: Datastore, params: Parameters, async_execution: bool) -> Outputs: """Runs this tool in a blocking manner and returns a map of the output datasets and collections.""" # TODO Most of this logic will be moved to job class outputs = Outputs() galaxy_instance = data_store.nova_connection.galaxy_instance datasets_to_upload = {} Loading @@ -57,7 +65,8 @@ class Tool(AbstractWork): datasets_to_upload[param] = val else: tool_inputs.set_param(param, val) self.state = get_state() self._job.submit() ids = upload_datasets(store=data_store, datasets=datasets_to_upload) for param, val in ids.items(): tool_inputs.set_dataset_param(param, val) Loading Loading @@ -131,6 +140,21 @@ class Tool(AbstractWork): else: raise Exception("Interactive tool was stopped unexpectedly.") def get_status(self): return self._job.get_status() def get_results(self): pass def stop(self): pass def cancel(self): pass def get_stdout(self): pass def stop_all_tools_in_store(data_store: Datastore) -> None: galaxy_instance = data_store.nova_connection.galaxy_instance Loading
tests/test_job.py +10 −16 Original line number Diff line number Diff line from nova.galaxy.tool import Tool, WorkState from nova.galaxy.parameters import Parameters from nova.galaxy.tool import Tool, WorkState TEST_INT_TOOL_ID = "interactive_tool_generic_output" def test_status(nova_instance, galaxy_instance): with nova_instance.connect() as connection: store = connection.create_data_store(name="nova_galaxy_testing") test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() state = test_tool.status() state = test_tool.get_status() assert state == WorkState.NOT_STARTED link = test_tool.run_interactive(data_store=store, params=params, check_url=False) state = test_tool.status() # state = connection.get_status(test_tool) state = test_tool.get_status() test_tool.get_results() assert state == WorkState.RUNNING # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.stop() # galaxy_instance.make_get_request(f"{store.nova_connection.galaxy_url}/api/jobs/{test_tool.job_id}/finish") # galaxy_instance.wait_for_job(test_tool.job_id) state = connection.get_status(test_tool) state = test_tool.get_status() assert state == WorkState.FINISHED Loading @@ -27,10 +26,8 @@ def test_cancel_tool(nova_instance, galaxy_instance): test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff test_tool.cancel() # state = connection.get_status(test_tool) state = test_tool.status() state = test_tool.get_status() assert state == WorkState.ERROR Loading @@ -40,10 +37,7 @@ def test_get_tool_stdout(nova_instance): test_tool = Tool(TEST_INT_TOOL_ID) params = Parameters() link = test_tool.run_interactive(data_store=store, params=params, check_url=False) # TODO: Implement job canceling/stopping, then replace this with that stuff state = test_tool.status() # state = connection.get_status(test_tool) state = test_tool.get_status() assert state == WorkState.RUNNING stdout = test_tool.get_stdout() assert stdout is not None # TODO maybe check specific stdout here