Loading tests/test_workflow.py +9 −7 Original line number Diff line number Diff line Loading @@ -11,7 +11,8 @@ GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "") WORKFLOW_NAME = "simple_test_workflow_with_dataset" WORKFLOW_NAME = "TestWorkflow" WORKFLOW_INPUT = "/HFIR/CG3/IPTS-24666/nexus/CG3_28275.nxs.h5" # This must be a public run. TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history" Loading @@ -26,6 +27,7 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> No workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True) workflow_id = workflows[0]["id"] params = WorkflowParameters() params.add_workflow_input("1", WORKFLOW_INPUT) workflow = Workflow(id=workflow_id) Loading Loading @@ -80,9 +82,9 @@ def test_simple_test_workflow_with_dataset(nova_instance: Connection) -> None: name="simple_test_workflow_with_dataset", published=True ) assert len(workflows) > 0, ( "'simple_test_workflow_with_dataset' not found. Please ensure it's published in Galaxy." ) assert ( len(workflows) > 0 ), "'simple_test_workflow_with_dataset' not found. Please ensure it's published in Galaxy." workflow_id = workflows[0]["id"] params = WorkflowParameters() Loading @@ -99,9 +101,9 @@ def test_simple_test_workflow_with_dataset(nova_instance: Connection) -> None: workflow.run(data_store=ds, params=params, wait=True) # Assertions for successful completion assert workflow.get_status() == WorkState.FINISHED, ( f"Workflow did not finish successfully. Current status: {workflow.get_status()}" ) assert ( workflow.get_status() == WorkState.FINISHED ), f"Workflow did not finish successfully. Current status: {workflow.get_status()}" def test_workflow_initial_state() -> None: Loading Loading
tests/test_workflow.py +9 −7 Original line number Diff line number Diff line Loading @@ -11,7 +11,8 @@ GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "") WORKFLOW_NAME = "simple_test_workflow_with_dataset" WORKFLOW_NAME = "TestWorkflow" WORKFLOW_INPUT = "/HFIR/CG3/IPTS-24666/nexus/CG3_28275.nxs.h5" # This must be a public run. TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history" Loading @@ -26,6 +27,7 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> No workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True) workflow_id = workflows[0]["id"] params = WorkflowParameters() params.add_workflow_input("1", WORKFLOW_INPUT) workflow = Workflow(id=workflow_id) Loading Loading @@ -80,9 +82,9 @@ def test_simple_test_workflow_with_dataset(nova_instance: Connection) -> None: name="simple_test_workflow_with_dataset", published=True ) assert len(workflows) > 0, ( "'simple_test_workflow_with_dataset' not found. Please ensure it's published in Galaxy." ) assert ( len(workflows) > 0 ), "'simple_test_workflow_with_dataset' not found. Please ensure it's published in Galaxy." workflow_id = workflows[0]["id"] params = WorkflowParameters() Loading @@ -99,9 +101,9 @@ def test_simple_test_workflow_with_dataset(nova_instance: Connection) -> None: workflow.run(data_store=ds, params=params, wait=True) # Assertions for successful completion assert workflow.get_status() == WorkState.FINISHED, ( f"Workflow did not finish successfully. Current status: {workflow.get_status()}" ) assert ( workflow.get_status() == WorkState.FINISHED ), f"Workflow did not finish successfully. Current status: {workflow.get_status()}" def test_workflow_initial_state() -> None: Loading