Loading tests/test_workflow.py +2 −9 Original line number Diff line number Diff line Loading @@ -11,8 +11,6 @@ GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "") WORKFLOW_NAME = "TestWorkflow" WORKFLOW_INPUT = "/HFIR/CG3/IPTS-24666/nexus/CG3_28275.nxs.h5" # This must be a public run. TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history" Loading @@ -24,14 +22,9 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> No """ with nova_instance.connect() as connection: ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF) workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True) workflow_id = workflows[0]["id"] params = WorkflowParameters() params.add_workflow_input("1", WORKFLOW_INPUT) workflow = Workflow(id=workflow_id) workflow = Workflow(id="placeholder_id") outputs = workflow.run(data_store=ds, params=params, wait=True) outputs = workflow.run(data_store=ds, wait=False) assert outputs is None status = workflow.get_status() Loading Loading
tests/test_workflow.py +2 −9 Original line number Diff line number Diff line Loading @@ -11,8 +11,6 @@ GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "") WORKFLOW_NAME = "TestWorkflow" WORKFLOW_INPUT = "/HFIR/CG3/IPTS-24666/nexus/CG3_28275.nxs.h5" # This must be a public run. TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history" Loading @@ -24,14 +22,9 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> No """ with nova_instance.connect() as connection: ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF) workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True) workflow_id = workflows[0]["id"] params = WorkflowParameters() params.add_workflow_input("1", WORKFLOW_INPUT) workflow = Workflow(id=workflow_id) workflow = Workflow(id="placeholder_id") outputs = workflow.run(data_store=ds, params=params, wait=True) outputs = workflow.run(data_store=ds, wait=False) assert outputs is None status = workflow.get_status() Loading