Unverified Commit 1c8ed34c authored by Andrew Ayres's avatar Andrew Ayres Committed by GitHub
Browse files

Merge pull request #3 from andrewfayres/adding_workfows

Adding workfows
parents ab827580 374cc164
Loading
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@ nova-galaxy revolves around several key concepts that facilitate interaction wit
   datasets
   tools
   interactive_tools
   workflows
   outputs
   parameters
   tool_runner
+175 −0
Original line number Diff line number Diff line
.. _workflows:

Workflows
=========

The ``nova-galaxy`` library provides a ``Workflow`` class to interact with and run Galaxy workflows. This allows you to programmatically execute complex bioinformatic pipelines defined in Galaxy.

Key Concepts
------------

*   **Workflow ID**: Each workflow in Galaxy has a unique ID. You'll need this ID to instantiate a ``Workflow`` object. You can typically find this ID through the Galaxy UI or API (e.g., using ``galaxy_instance.workflows.get_workflows()``).
*   **Datastore**: Workflows are run within a specific Galaxy history, which is represented by a :ref:`Datastore <datastores>` object in ``nova-galaxy``.
*   **Parameters**: Workflows often require input datasets and various parameters to control their execution. These are provided via a :ref:`Parameters <parameters>` object.
*   **Invocation**: Each run of a workflow is called an "invocation". The library manages the state and results of these invocations.
*   **Outputs**: Upon successful completion, a workflow produces output datasets, which can be accessed via an :ref:`Outputs <outputs>` object.

Using the ``Workflow`` Class
----------------------------

The primary class for interacting with workflows is ``nova.galaxy.workflow.Workflow``.

Initializing a Workflow
~~~~~~~~~~~~~~~~~~~~~~~

To start, you need the ID of the Galaxy workflow you want to run.

.. code-block:: python

    from nova.galaxy.workflow import Workflow

    # Replace 'your_workflow_id' with the actual ID from Galaxy
    workflow_id = "your_workflow_id"
    my_workflow = Workflow(id=workflow_id)

Running a Workflow
~~~~~~~~~~~~~~~~~~

To run the workflow, you use the ``run()`` method. This method requires a ``Datastore`` (representing the Galaxy history) and optionally a ``Parameters`` object for inputs.

.. code-block:: python

    from nova.galaxy.data_store import Datastore
    from nova.galaxy.parameters import Parameters
    from nova.galaxy.dataset import Dataset # Assuming you have an input dataset

    # Assume 'galaxy_connection' is an established Connection object
    # Assume 'history_id' is the ID of the target Galaxy history
    data_store = Datastore(galaxy_connection, history_id=history_id)

    # Prepare parameters (if any)
    params = Parameters()
    # Example: Adding an input dataset. 'input_dataset_label' is the label
    # of the workflow input as defined in Galaxy.
    # 'input_ds_id' is the Galaxy ID of an existing dataset in the history.
    input_dataset = Dataset(name="My Input Data", id="input_ds_id")
    input_dataset.store = data_store # Associate dataset with the datastore
    params.add_input("input_dataset_label", input_dataset)

    # Example: Setting a tool parameter within the workflow.
    # 'workflow_step_label' is the label of the step in Galaxy.
    # 'parameter_name' is the name of the parameter for that tool.
    params.add_parameter("workflow_step_label", {"parameter_name": "parameter_value"})


    # Run the workflow and wait for completion (default behavior)
    try:
        outputs = my_workflow.run(data_store=data_store, params=params, wait=True)
        if outputs:
            print("Workflow completed successfully!")
    except Exception as e:
        print(f"Workflow execution failed: {e}")

Non-Blocking Execution
^^^^^^^^^^^^^^^^^^^^^^

If you don't want to wait for the workflow to complete, set ``wait=False``.

.. code-block:: python

    my_workflow.run(data_store=data_store, params=params, wait=False)
    print(f"Workflow started with invocation ID: {my_workflow.get_invocation_id()}")
    # You'll need to check the status periodically

Checking Workflow Status
~~~~~~~~~~~~~~~~~~~~~~~~

You can check the status of the last workflow invocation using ``get_status()`` or ``get_full_status()``.

.. code-block:: python

    from nova.common.job import WorkState

    status = my_workflow.get_status()
    print(f"Current workflow status: {status}")

    if status == WorkState.RUNNING:
        print("Workflow is still running.")
    elif status == WorkState.FINISHED:
        print("Workflow finished successfully.")
    elif status == WorkState.ERROR:
        full_status = my_workflow.get_full_status()
        print(f"Workflow failed. Details: {full_status.details if full_status else 'N/A'}")

The ``get_status()`` method returns a ``WorkState`` enum member (e.g., ``WorkState.QUEUED``, ``WorkState.RUNNING``, ``WorkState.FINISHED``, ``WorkState.ERROR``).

The ``get_full_status()`` method returns an ``InvocationStatus`` object which contains both the ``state`` and a ``details`` string (useful for error messages).

Getting Workflow Results
~~~~~~~~~~~~~~~~~~~~~~~~

Once a workflow has completed successfully (``get_status() == WorkState.FINISHED``), you can retrieve its outputs using ``get_results()``.

.. code-block:: python

    if my_workflow.get_status() == WorkState.FINISHED:
        outputs = my_workflow.get_results()
        if outputs:
            for output_name, dataset_or_collection in outputs.items():
                print(f"Output '{output_name}': ID {dataset_or_collection.id}")
        else:
            print("No outputs found, or an issue retrieving them.")

The ``get_results()`` method returns an ``Outputs`` object, which is a dictionary-like structure mapping output names (as defined in the workflow) to ``Dataset`` or ``DatasetCollection`` objects.

Cancelling a Workflow
~~~~~~~~~~~~~~~~~~~~~

If a workflow is running, you can attempt to cancel it using ``cancel()`` or its alias ``stop()``.

.. code-block:: python

    if my_workflow.get_status() == WorkState.RUNNING:
        was_cancelled = my_workflow.cancel()
        if was_cancelled:
            print("Workflow cancellation requested.")
        else:
            print("Failed to request workflow cancellation.")

Getting Invocation ID
~~~~~~~~~~~~~~~~~~~~~

Each workflow run (invocation) has a unique ID in Galaxy. You can retrieve this ID:

.. code-block:: python

    invocation_id = my_workflow.get_invocation_id()
    if invocation_id:
        print(f"Galaxy Invocation ID: {invocation_id}")

Accessing Step-Level Jobs
~~~~~~~~~~~~~~~~~~~~~~~~~

Workflows are composed of individual tool executions (jobs). You can access these as ``Job`` objects using ``get_step_jobs()``. This is useful for monitoring progress at a finer grain or retrieving logs from specific steps.

.. code-block:: python

    from nova.galaxy.job import Job

    step_jobs: List[Job] = my_workflow.get_step_jobs()
    for job in step_jobs:
        print(f"Step Tool ID: {job.tool_id}, Status: {job.get_status()}")
        if job.get_status() == WorkState.ERROR:
            full_job_status = job.get_full_status()
            print(f"  Job Error Details: {full_job_status.details if full_job_status else 'N/A'}")


Important Notes
---------------

*   **Workflow Definition**: The structure of your ``Parameters`` object (input labels, step labels for parameters) must match how the workflow is defined in Galaxy. Use the Galaxy UI or API to inspect your workflow's inputs and step details.
*   **Dataset IDs**: When providing ``Dataset`` or ``DatasetCollection`` objects as inputs, they must already exist in the Galaxy history and have their ``id`` attribute populated.
*   **Error Handling**: Always wrap ``run()`` calls (especially with ``wait=True``) in try-except blocks to handle potential exceptions during workflow execution. Check ``get_full_status().details`` for more information on errors.
*   **State Management**: The ``Workflow`` object primarily manages the state of its *last* invocation. If you need to manage multiple concurrent runs of the same workflow definition, instantiate a new ``Workflow`` object for each run.

This guide provides an overview of using the ``Workflow`` class. For more detailed information on specific classes like ``Datastore``, ``Parameters``, ``Dataset``, and ``Outputs``, please refer to their respective documentation pages.
 No newline at end of file
+2 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ from .outputs import Outputs
from .parameters import Parameters
from .tool import Tool
from .tool_runner import ToolRunner
from .workflow import Workflow

__all__ = [
    "BasicTool",
@@ -19,6 +20,7 @@ __all__ = [
    "Parameters",
    "Tool",
    "ToolRunner",
    "Workflow",
]

__version__ = importlib.metadata.version("nova-galaxy")
+484 −0

File added.

Preview size limit exceeded, changes collapsed.

tests/test_workflow.py

0 → 100644
+80 −0
Original line number Diff line number Diff line
"""Integration tests for Workflow functionality."""

import os
import time

import pytest

from nova.common.job import WorkState
from nova.galaxy.connection import Connection
from nova.galaxy.data_store import Datastore
from nova.galaxy.parameters import Parameters
from nova.galaxy.workflow import Workflow

GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test.ornl.gov")
GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "")


WORKFLOW_NAME = "Simple_test_workflow"
TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history"

def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection):
    """
    Tests the Workflow class lifecycle methods when using a placeholder workflow ID.
    This test expects failures when trying to run the workflow, as the ID is a placeholder.
    """
    with nova_instance.connect() as connection:
        ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF)
        workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True)
        workflow_id = workflows[0]['id']
        params = Parameters()

        workflow = Workflow(id=workflow_id)

        outputs= workflow.run(data_store=ds, params=params, wait=True)
        assert outputs is None

        status = workflow.get_status()
        invocation_id = workflow.get_invocation_id()
        full_status = workflow.get_full_status()

        print(f"Status after run(wait=True): {status}")
        print(f"Invocation ID after run(wait=True): {invocation_id}")
        print(f"Full status details: {full_status.details if full_status else 'N/A'}")

        assert status in [WorkState.ERROR, WorkState.QUEUED], \
            f"Expected ERROR or QUEUED state after run(wait=False), got {status}"

        if status == WorkState.ERROR:
            assert full_status is not None
            assert full_status.state == WorkState.ERROR
            assert full_status.details is not None and full_status.details != ""
    
        results = workflow.get_results()
        assert results is None, f"Expected no results for a failed/incomplete workflow, got {results}"

        cancel_result = workflow.cancel()
        print(f"Cancel result: {cancel_result}")
        if invocation_id:
            pass
        else:
            assert not cancel_result, "Cancel should return False if no invocation ID was set"

        step_jobs = workflow.get_step_jobs()
        assert isinstance(step_jobs, list)
        assert len(step_jobs) == 0, "Expected no step jobs for a placeholder/failed workflow"

        final_status = workflow.get_status()
        assert final_status in [WorkState.ERROR, WorkState.CANCELED, WorkState.QUEUED], \
             f"Unexpected final state: {final_status}"

def test_workflow_initial_state():
    """Tests the initial state of a Workflow object before any run."""
    workflow = Workflow(id="another_placeholder_id")
    assert workflow.id == "another_placeholder_id"
    assert workflow.get_status() == WorkState.NOT_STARTED
    assert workflow.get_invocation_id() is None
    assert workflow.get_results() is None
    assert workflow.get_full_status() is None
    assert not workflow.cancel()
    assert workflow.get_step_jobs() == []
 No newline at end of file