Loading src/nova/galaxy/workflow.py +22 −5 Original line number Diff line number Diff line Loading @@ -67,7 +67,7 @@ class Invocation: "running": WorkState.RUNNING, "ok": WorkState.FINISHED, "error": WorkState.ERROR, "paused": WorkState.PAUSED, "paused": WorkState.QUEUED, "canceled": WorkState.CANCELED, # Invocation-specific states Loading Loading @@ -176,8 +176,6 @@ class Invocation: bioblend_params[step_id] = value self.status.state = WorkState.QUEUED print(bioblend_inputs) print(bioblend_params) invocation_info = self.galaxy_instance.workflows.invoke_workflow( workflow_id=self.workflow_id, inputs=bioblend_inputs, Loading @@ -187,7 +185,6 @@ class Invocation: ) self.invocation_id = invocation_info['id'] self.status.state = self._map_galaxy_state_to_workstate(invocation_info['state']) except Exception as e: self.status.state = WorkState.ERROR self.status.details = f"Failed to prepare or submit workflow invocation: {str(e)}" Loading @@ -198,7 +195,6 @@ class Invocation: """Waits for the workflow invocation to complete.""" if not self.invocation_id: raise Exception("Cannot wait for results, invocation ID is not set.") # TODO: Consider adding optional polling with status updates for long workflows self.galaxy_instance.invocations.wait_for_invocation(self.invocation_id) Loading Loading @@ -465,3 +461,24 @@ class Workflow(AbstractWork): return self._invocation.get_step_jobs() return [] def get_active_step(self) -> Optional["Job"]: """Gets the currently active (running) step in the workflow invocation. This method iterates through all jobs associated with the workflow steps and returns the first one found to be in the 'RUNNING' state. Returns ------- Optional["Job"] The Job instance representing the currently running step. Returns None if no step is currently running, if the workflow hasn't been run yet, or if step jobs cannot be retrieved. """ if not self._invocation: return None step_jobs = self._invocation.get_step_jobs() for job in step_jobs: if job.status.state == WorkState.RUNNING: return job return None No newline at end of file tests/test_workflow.py +36 −84 Original line number Diff line number Diff line Loading @@ -15,98 +15,51 @@ GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "") PLACEHOLDER_WORKFLOW_ID = "test_workflow_id_for_nova_galaxy_placeholder" WORKFLOW_NAME = "Simple_test_workflow" TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history" @pytest.fixture(scope="module") def nova_galaxy_connection() -> Connection: """Provides a Connection instance for the tests.""" if not GALAXY_API_KEY: pytest.skip("NOVA_GALAXY_TEST_GALAXY_KEY is not set. Skipping integration tests.") conn = Connection(galaxy_url=GALAXY_URL, api_key=GALAXY_API_KEY) return conn @pytest.fixture def test_datastore(nova_galaxy_connection: Connection) -> Datastore: """Creates a new history for testing and yields the Datastore.""" ds = nova_galaxy_connection.create_data_store(name=TEST_HISTORY_NAME_WF) yield ds try: if not ds.history_id: histories = nova_galaxy_connection.galaxy_instance.histories.get_histories(name=TEST_HISTORY_NAME_WF) if histories: ds.history_id = histories[0]['id'] if ds.history_id: nova_galaxy_connection.galaxy_instance.histories.delete_history(ds.history_id, purge=True) print(f"Cleaned up history: {TEST_HISTORY_NAME_WF} (ID: {ds.history_id})") except Exception as e: print(f"Error during history cleanup: {e}") @pytest.mark.integration def test_workflow_lifecycle_with_placeholder_id( nova_galaxy_connection: Connection, test_datastore: Datastore ): def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection): """ Tests the Workflow class lifecycle methods when using a placeholder workflow ID. This test expects failures when trying to run the workflow, as the ID is a placeholder. """ workflow = Workflow(id=PLACEHOLDER_WORKFLOW_ID) with nova_instance.connect() as connection: ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF) workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True) workflow_id = workflows[0]['id'] params = Parameters() assert workflow.id == PLACEHOLDER_WORKFLOW_ID assert workflow.get_status() == WorkState.NOT_STARTED assert workflow.get_invocation_id() is None with pytest.raises(Exception) as excinfo_run_wait: workflow.run(data_store=test_datastore, params=params, wait=True) print(f"Exception from run(wait=True): {excinfo_run_wait.value}") workflow = Workflow(id=workflow_id) assert workflow.get_status() == WorkState.ERROR, \ f"Expected ERROR state after failed run, got {workflow.get_status()}" outputs= workflow.run(data_store=ds, params=params, wait=True) assert outputs is None full_status_after_fail_wait = workflow.get_full_status() assert full_status_after_fail_wait is not None assert full_status_after_fail_wait.state == WorkState.ERROR assert full_status_after_fail_wait.details is not None and full_status_after_fail_wait.details != "" status = workflow.get_status() invocation_id = workflow.get_invocation_id() full_status = workflow.get_full_status() workflow = Workflow(id=PLACEHOLDER_WORKFLOW_ID) print(f"Status after run(wait=True): {status}") print(f"Invocation ID after run(wait=True): {invocation_id}") print(f"Full status details: {full_status.details if full_status else 'N/A'}") outputs_no_wait = workflow.run(data_store=test_datastore, params=params, wait=False) assert outputs_no_wait is None assert status in [WorkState.ERROR, WorkState.QUEUED], \ f"Expected ERROR or QUEUED state after run(wait=False), got {status}" time.sleep(2) status_no_wait = workflow.get_status() invocation_id_no_wait = workflow.get_invocation_id() full_status_no_wait = workflow.get_full_status() print(f"Status after run(wait=False): {status_no_wait}") print(f"Invocation ID after run(wait=False): {invocation_id_no_wait}") print(f"Full status details: {full_status_no_wait.details if full_status_no_wait else 'N/A'}") assert status_no_wait in [WorkState.ERROR, WorkState.QUEUED], \ f"Expected ERROR or QUEUED state after run(wait=False), got {status_no_wait}" if status_no_wait == WorkState.ERROR: assert full_status_no_wait is not None assert full_status_no_wait.state == WorkState.ERROR assert full_status_no_wait.details is not None and full_status_no_wait.details != "" if status == WorkState.ERROR: assert full_status is not None assert full_status.state == WorkState.ERROR assert full_status.details is not None and full_status.details != "" results = workflow.get_results() assert results is None, f"Expected no results for a failed/incomplete workflow, got {results}" cancel_result = workflow.cancel() print(f"Cancel result: {cancel_result}") if invocation_id_no_wait: if invocation_id: pass else: assert not cancel_result, "Cancel should return False if no invocation ID was set" step_jobs = workflow.get_step_jobs() assert isinstance(step_jobs, list) assert len(step_jobs) == 0, "Expected no step jobs for a placeholder/failed workflow" Loading @@ -115,7 +68,6 @@ def test_workflow_lifecycle_with_placeholder_id( assert final_status in [WorkState.ERROR, WorkState.CANCELED, WorkState.QUEUED], \ f"Unexpected final state: {final_status}" @pytest.mark.integration def test_workflow_initial_state(): """Tests the initial state of a Workflow object before any run.""" workflow = Workflow(id="another_placeholder_id") Loading Loading
src/nova/galaxy/workflow.py +22 −5 Original line number Diff line number Diff line Loading @@ -67,7 +67,7 @@ class Invocation: "running": WorkState.RUNNING, "ok": WorkState.FINISHED, "error": WorkState.ERROR, "paused": WorkState.PAUSED, "paused": WorkState.QUEUED, "canceled": WorkState.CANCELED, # Invocation-specific states Loading Loading @@ -176,8 +176,6 @@ class Invocation: bioblend_params[step_id] = value self.status.state = WorkState.QUEUED print(bioblend_inputs) print(bioblend_params) invocation_info = self.galaxy_instance.workflows.invoke_workflow( workflow_id=self.workflow_id, inputs=bioblend_inputs, Loading @@ -187,7 +185,6 @@ class Invocation: ) self.invocation_id = invocation_info['id'] self.status.state = self._map_galaxy_state_to_workstate(invocation_info['state']) except Exception as e: self.status.state = WorkState.ERROR self.status.details = f"Failed to prepare or submit workflow invocation: {str(e)}" Loading @@ -198,7 +195,6 @@ class Invocation: """Waits for the workflow invocation to complete.""" if not self.invocation_id: raise Exception("Cannot wait for results, invocation ID is not set.") # TODO: Consider adding optional polling with status updates for long workflows self.galaxy_instance.invocations.wait_for_invocation(self.invocation_id) Loading Loading @@ -465,3 +461,24 @@ class Workflow(AbstractWork): return self._invocation.get_step_jobs() return [] def get_active_step(self) -> Optional["Job"]: """Gets the currently active (running) step in the workflow invocation. This method iterates through all jobs associated with the workflow steps and returns the first one found to be in the 'RUNNING' state. Returns ------- Optional["Job"] The Job instance representing the currently running step. Returns None if no step is currently running, if the workflow hasn't been run yet, or if step jobs cannot be retrieved. """ if not self._invocation: return None step_jobs = self._invocation.get_step_jobs() for job in step_jobs: if job.status.state == WorkState.RUNNING: return job return None No newline at end of file
tests/test_workflow.py +36 −84 Original line number Diff line number Diff line Loading @@ -15,98 +15,51 @@ GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "") PLACEHOLDER_WORKFLOW_ID = "test_workflow_id_for_nova_galaxy_placeholder" WORKFLOW_NAME = "Simple_test_workflow" TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history" @pytest.fixture(scope="module") def nova_galaxy_connection() -> Connection: """Provides a Connection instance for the tests.""" if not GALAXY_API_KEY: pytest.skip("NOVA_GALAXY_TEST_GALAXY_KEY is not set. Skipping integration tests.") conn = Connection(galaxy_url=GALAXY_URL, api_key=GALAXY_API_KEY) return conn @pytest.fixture def test_datastore(nova_galaxy_connection: Connection) -> Datastore: """Creates a new history for testing and yields the Datastore.""" ds = nova_galaxy_connection.create_data_store(name=TEST_HISTORY_NAME_WF) yield ds try: if not ds.history_id: histories = nova_galaxy_connection.galaxy_instance.histories.get_histories(name=TEST_HISTORY_NAME_WF) if histories: ds.history_id = histories[0]['id'] if ds.history_id: nova_galaxy_connection.galaxy_instance.histories.delete_history(ds.history_id, purge=True) print(f"Cleaned up history: {TEST_HISTORY_NAME_WF} (ID: {ds.history_id})") except Exception as e: print(f"Error during history cleanup: {e}") @pytest.mark.integration def test_workflow_lifecycle_with_placeholder_id( nova_galaxy_connection: Connection, test_datastore: Datastore ): def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection): """ Tests the Workflow class lifecycle methods when using a placeholder workflow ID. This test expects failures when trying to run the workflow, as the ID is a placeholder. """ workflow = Workflow(id=PLACEHOLDER_WORKFLOW_ID) with nova_instance.connect() as connection: ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF) workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True) workflow_id = workflows[0]['id'] params = Parameters() assert workflow.id == PLACEHOLDER_WORKFLOW_ID assert workflow.get_status() == WorkState.NOT_STARTED assert workflow.get_invocation_id() is None with pytest.raises(Exception) as excinfo_run_wait: workflow.run(data_store=test_datastore, params=params, wait=True) print(f"Exception from run(wait=True): {excinfo_run_wait.value}") workflow = Workflow(id=workflow_id) assert workflow.get_status() == WorkState.ERROR, \ f"Expected ERROR state after failed run, got {workflow.get_status()}" outputs= workflow.run(data_store=ds, params=params, wait=True) assert outputs is None full_status_after_fail_wait = workflow.get_full_status() assert full_status_after_fail_wait is not None assert full_status_after_fail_wait.state == WorkState.ERROR assert full_status_after_fail_wait.details is not None and full_status_after_fail_wait.details != "" status = workflow.get_status() invocation_id = workflow.get_invocation_id() full_status = workflow.get_full_status() workflow = Workflow(id=PLACEHOLDER_WORKFLOW_ID) print(f"Status after run(wait=True): {status}") print(f"Invocation ID after run(wait=True): {invocation_id}") print(f"Full status details: {full_status.details if full_status else 'N/A'}") outputs_no_wait = workflow.run(data_store=test_datastore, params=params, wait=False) assert outputs_no_wait is None assert status in [WorkState.ERROR, WorkState.QUEUED], \ f"Expected ERROR or QUEUED state after run(wait=False), got {status}" time.sleep(2) status_no_wait = workflow.get_status() invocation_id_no_wait = workflow.get_invocation_id() full_status_no_wait = workflow.get_full_status() print(f"Status after run(wait=False): {status_no_wait}") print(f"Invocation ID after run(wait=False): {invocation_id_no_wait}") print(f"Full status details: {full_status_no_wait.details if full_status_no_wait else 'N/A'}") assert status_no_wait in [WorkState.ERROR, WorkState.QUEUED], \ f"Expected ERROR or QUEUED state after run(wait=False), got {status_no_wait}" if status_no_wait == WorkState.ERROR: assert full_status_no_wait is not None assert full_status_no_wait.state == WorkState.ERROR assert full_status_no_wait.details is not None and full_status_no_wait.details != "" if status == WorkState.ERROR: assert full_status is not None assert full_status.state == WorkState.ERROR assert full_status.details is not None and full_status.details != "" results = workflow.get_results() assert results is None, f"Expected no results for a failed/incomplete workflow, got {results}" cancel_result = workflow.cancel() print(f"Cancel result: {cancel_result}") if invocation_id_no_wait: if invocation_id: pass else: assert not cancel_result, "Cancel should return False if no invocation ID was set" step_jobs = workflow.get_step_jobs() assert isinstance(step_jobs, list) assert len(step_jobs) == 0, "Expected no step jobs for a placeholder/failed workflow" Loading @@ -115,7 +68,6 @@ def test_workflow_lifecycle_with_placeholder_id( assert final_status in [WorkState.ERROR, WorkState.CANCELED, WorkState.QUEUED], \ f"Unexpected final state: {final_status}" @pytest.mark.integration def test_workflow_initial_state(): """Tests the initial state of a Workflow object before any run.""" workflow = Workflow(id="another_placeholder_id") Loading