Commit 07e79a75 authored by Your Name's avatar Your Name
Browse files

Tests and docs

parent 028185dc
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@ nova-galaxy revolves around several key concepts that facilitate interaction wit
   datasets
   tools
   interactive_tools
   workflows
   outputs
   parameters
   tool_runner
+175 −0
Original line number Diff line number Diff line
.. _workflows:

Workflows
=========

The ``nova-galaxy`` library provides a ``Workflow`` class to interact with and run Galaxy workflows. This allows you to programmatically execute complex bioinformatic pipelines defined in Galaxy.

Key Concepts
------------

*   **Workflow ID**: Each workflow in Galaxy has a unique ID. You'll need this ID to instantiate a ``Workflow`` object. You can typically find this ID through the Galaxy UI or API (e.g., using ``galaxy_instance.workflows.get_workflows()``).
*   **Datastore**: Workflows are run within a specific Galaxy history, which is represented by a :ref:`Datastore <datastores>` object in ``nova-galaxy``.
*   **Parameters**: Workflows often require input datasets and various parameters to control their execution. These are provided via a :ref:`Parameters <parameters>` object.
*   **Invocation**: Each run of a workflow is called an "invocation". The library manages the state and results of these invocations.
*   **Outputs**: Upon successful completion, a workflow produces output datasets, which can be accessed via an :ref:`Outputs <outputs>` object.

Using the ``Workflow`` Class
----------------------------

The primary class for interacting with workflows is ``nova.galaxy.workflow.Workflow``.

Initializing a Workflow
~~~~~~~~~~~~~~~~~~~~~~~

To start, you need the ID of the Galaxy workflow you want to run.

.. code-block:: python

    from nova.galaxy.workflow import Workflow

    # Replace 'your_workflow_id' with the actual ID from Galaxy
    workflow_id = "your_workflow_id"
    my_workflow = Workflow(id=workflow_id)

Running a Workflow
~~~~~~~~~~~~~~~~~~

To run the workflow, you use the ``run()`` method. This method requires a ``Datastore`` (representing the Galaxy history) and optionally a ``Parameters`` object for inputs.

.. code-block:: python

    from nova.galaxy.data_store import Datastore
    from nova.galaxy.parameters import Parameters
    from nova.galaxy.dataset import Dataset # Assuming you have an input dataset

    # Assume 'galaxy_connection' is an established Connection object
    # Assume 'history_id' is the ID of the target Galaxy history
    data_store = Datastore(galaxy_connection, history_id=history_id)

    # Prepare parameters (if any)
    params = Parameters()
    # Example: Adding an input dataset. 'input_dataset_label' is the label
    # of the workflow input as defined in Galaxy.
    # 'input_ds_id' is the Galaxy ID of an existing dataset in the history.
    input_dataset = Dataset(name="My Input Data", id="input_ds_id")
    input_dataset.store = data_store # Associate dataset with the datastore
    params.add_input("input_dataset_label", input_dataset)

    # Example: Setting a tool parameter within the workflow.
    # 'workflow_step_label' is the label of the step in Galaxy.
    # 'parameter_name' is the name of the parameter for that tool.
    params.add_parameter("workflow_step_label", {"parameter_name": "parameter_value"})


    # Run the workflow and wait for completion (default behavior)
    try:
        outputs = my_workflow.run(data_store=data_store, params=params, wait=True)
        if outputs:
            print("Workflow completed successfully!")
    except Exception as e:
        print(f"Workflow execution failed: {e}")

Non-Blocking Execution
^^^^^^^^^^^^^^^^^^^^^^

If you don't want to wait for the workflow to complete, set ``wait=False``.

.. code-block:: python

    my_workflow.run(data_store=data_store, params=params, wait=False)
    print(f"Workflow started with invocation ID: {my_workflow.get_invocation_id()}")
    # You'll need to check the status periodically

Checking Workflow Status
~~~~~~~~~~~~~~~~~~~~~~~~

You can check the status of the last workflow invocation using ``get_status()`` or ``get_full_status()``.

.. code-block:: python

    from nova.common.job import WorkState

    status = my_workflow.get_status()
    print(f"Current workflow status: {status}")

    if status == WorkState.RUNNING:
        print("Workflow is still running.")
    elif status == WorkState.FINISHED:
        print("Workflow finished successfully.")
    elif status == WorkState.ERROR:
        full_status = my_workflow.get_full_status()
        print(f"Workflow failed. Details: {full_status.details if full_status else 'N/A'}")

The ``get_status()`` method returns a ``WorkState`` enum member (e.g., ``WorkState.QUEUED``, ``WorkState.RUNNING``, ``WorkState.FINISHED``, ``WorkState.ERROR``).

The ``get_full_status()`` method returns an ``InvocationStatus`` object which contains both the ``state`` and a ``details`` string (useful for error messages).

Getting Workflow Results
~~~~~~~~~~~~~~~~~~~~~~~~

Once a workflow has completed successfully (``get_status() == WorkState.FINISHED``), you can retrieve its outputs using ``get_results()``.

.. code-block:: python

    if my_workflow.get_status() == WorkState.FINISHED:
        outputs = my_workflow.get_results()
        if outputs:
            for output_name, dataset_or_collection in outputs.items():
                print(f"Output '{output_name}': ID {dataset_or_collection.id}")
        else:
            print("No outputs found, or an issue retrieving them.")

The ``get_results()`` method returns an ``Outputs`` object, which is a dictionary-like structure mapping output names (as defined in the workflow) to ``Dataset`` or ``DatasetCollection`` objects.

Cancelling a Workflow
~~~~~~~~~~~~~~~~~~~~~

If a workflow is running, you can attempt to cancel it using ``cancel()`` or its alias ``stop()``.

.. code-block:: python

    if my_workflow.get_status() == WorkState.RUNNING:
        was_cancelled = my_workflow.cancel()
        if was_cancelled:
            print("Workflow cancellation requested.")
        else:
            print("Failed to request workflow cancellation.")

Getting Invocation ID
~~~~~~~~~~~~~~~~~~~~~

Each workflow run (invocation) has a unique ID in Galaxy. You can retrieve this ID:

.. code-block:: python

    invocation_id = my_workflow.get_invocation_id()
    if invocation_id:
        print(f"Galaxy Invocation ID: {invocation_id}")

Accessing Step-Level Jobs
~~~~~~~~~~~~~~~~~~~~~~~~~

Workflows are composed of individual tool executions (jobs). You can access these as ``Job`` objects using ``get_step_jobs()``. This is useful for monitoring progress at a finer grain or retrieving logs from specific steps.

.. code-block:: python

    from nova.galaxy.job import Job

    step_jobs: List[Job] = my_workflow.get_step_jobs()
    for job in step_jobs:
        print(f"Step Tool ID: {job.tool_id}, Status: {job.get_status()}")
        if job.get_status() == WorkState.ERROR:
            full_job_status = job.get_full_status()
            print(f"  Job Error Details: {full_job_status.details if full_job_status else 'N/A'}")


Important Notes
---------------

*   **Workflow Definition**: The structure of your ``Parameters`` object (input labels, step labels for parameters) must match how the workflow is defined in Galaxy. Use the Galaxy UI or API to inspect your workflow's inputs and step details.
*   **Dataset IDs**: When providing ``Dataset`` or ``DatasetCollection`` objects as inputs, they must already exist in the Galaxy history and have their ``id`` attribute populated.
*   **Error Handling**: Always wrap ``run()`` calls (especially with ``wait=True``) in try-except blocks to handle potential exceptions during workflow execution. Check ``get_full_status().details`` for more information on errors.
*   **State Management**: The ``Workflow`` object primarily manages the state of its *last* invocation. If you need to manage multiple concurrent runs of the same workflow definition, instantiate a new ``Workflow`` object for each run.

This guide provides an overview of using the ``Workflow`` class. For more detailed information on specific classes like ``Datastore``, ``Parameters``, ``Dataset``, and ``Outputs``, please refer to their respective documentation pages.
 No newline at end of file

tests/test_workflow.py

0 → 100644
+128 −0
Original line number Diff line number Diff line
"""Integration tests for Workflow functionality."""

import os
import time

import pytest

from nova.common.job import WorkState
from nova.galaxy.connection import Connection
from nova.galaxy.data_store import Datastore
from nova.galaxy.parameters import Parameters
from nova.galaxy.workflow import Workflow

GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test.ornl.gov")
GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "")


PLACEHOLDER_WORKFLOW_ID = "test_workflow_id_for_nova_galaxy_placeholder"
TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history"


@pytest.fixture(scope="module")
def nova_galaxy_connection() -> Connection:
    """Provides a Connection instance for the tests."""
    if not GALAXY_API_KEY:
        pytest.skip("NOVA_GALAXY_TEST_GALAXY_KEY is not set. Skipping integration tests.")
    conn = Connection(galaxy_url=GALAXY_URL, api_key=GALAXY_API_KEY)
    return conn

@pytest.fixture
def test_datastore(nova_galaxy_connection: Connection) -> Datastore:
    """Creates a new history for testing and yields the Datastore."""
    ds = nova_galaxy_connection.create_data_store(name=TEST_HISTORY_NAME_WF)
    yield ds
    try:
        if not ds.history_id:
            histories = nova_galaxy_connection.galaxy_instance.histories.get_histories(name=TEST_HISTORY_NAME_WF)
            if histories:
                ds.history_id = histories[0]['id']
        
        if ds.history_id:
            nova_galaxy_connection.galaxy_instance.histories.delete_history(ds.history_id, purge=True)
            print(f"Cleaned up history: {TEST_HISTORY_NAME_WF} (ID: {ds.history_id})")
    except Exception as e:
        print(f"Error during history cleanup: {e}")


@pytest.mark.integration
def test_workflow_lifecycle_with_placeholder_id(
    nova_galaxy_connection: Connection, test_datastore: Datastore
):
    """
    Tests the Workflow class lifecycle methods when using a placeholder workflow ID.
    This test expects failures when trying to run the workflow, as the ID is a placeholder.
    """
    workflow = Workflow(id=PLACEHOLDER_WORKFLOW_ID)
    params = Parameters()

    assert workflow.id == PLACEHOLDER_WORKFLOW_ID
    assert workflow.get_status() == WorkState.NOT_STARTED
    assert workflow.get_invocation_id() is None

    with pytest.raises(Exception) as excinfo_run_wait:
        workflow.run(data_store=test_datastore, params=params, wait=True)
    
    print(f"Exception from run(wait=True): {excinfo_run_wait.value}")

    assert workflow.get_status() == WorkState.ERROR, \
        f"Expected ERROR state after failed run, got {workflow.get_status()}"
    
    full_status_after_fail_wait = workflow.get_full_status()
    assert full_status_after_fail_wait is not None
    assert full_status_after_fail_wait.state == WorkState.ERROR
    assert full_status_after_fail_wait.details is not None and full_status_after_fail_wait.details != ""

    workflow = Workflow(id=PLACEHOLDER_WORKFLOW_ID)

    outputs_no_wait = workflow.run(data_store=test_datastore, params=params, wait=False)
    assert outputs_no_wait is None

    time.sleep(2)

    status_no_wait = workflow.get_status()
    invocation_id_no_wait = workflow.get_invocation_id()
    full_status_no_wait = workflow.get_full_status()

    print(f"Status after run(wait=False): {status_no_wait}")
    print(f"Invocation ID after run(wait=False): {invocation_id_no_wait}")
    print(f"Full status details: {full_status_no_wait.details if full_status_no_wait else 'N/A'}")

    assert status_no_wait in [WorkState.ERROR, WorkState.QUEUED], \
        f"Expected ERROR or QUEUED state after run(wait=False), got {status_no_wait}"

    if status_no_wait == WorkState.ERROR:
        assert full_status_no_wait is not None
        assert full_status_no_wait.state == WorkState.ERROR
        assert full_status_no_wait.details is not None and full_status_no_wait.details != ""
    
    results = workflow.get_results()
    assert results is None, f"Expected no results for a failed/incomplete workflow, got {results}"

    cancel_result = workflow.cancel()
    print(f"Cancel result: {cancel_result}")
    if invocation_id_no_wait:
        pass
    else:
        assert not cancel_result, "Cancel should return False if no invocation ID was set"


    step_jobs = workflow.get_step_jobs()
    assert isinstance(step_jobs, list)
    assert len(step_jobs) == 0, "Expected no step jobs for a placeholder/failed workflow"

    final_status = workflow.get_status()
    assert final_status in [WorkState.ERROR, WorkState.CANCELED, WorkState.QUEUED], \
         f"Unexpected final state: {final_status}"

@pytest.mark.integration
def test_workflow_initial_state():
    """Tests the initial state of a Workflow object before any run."""
    workflow = Workflow(id="another_placeholder_id")
    assert workflow.id == "another_placeholder_id"
    assert workflow.get_status() == WorkState.NOT_STARTED
    assert workflow.get_invocation_id() is None
    assert workflow.get_results() is None
    assert workflow.get_full_status() is None
    assert not workflow.cancel()
    assert workflow.get_step_jobs() == []
 No newline at end of file