Unverified Commit 29c3934f authored by Andrew Ayres's avatar Andrew Ayres Committed by GitHub
Browse files

Merge pull request #53 from nova-model/workflow_update

Modified the way workflow parameters work to be more explicitly decla…
parents ed5b2281 0ba7eee8
Loading
Loading
Loading
Loading
Loading
+52 −0
Original line number Diff line number Diff line
@@ -11,3 +11,55 @@ The `Parameters` class is used to define the input parameters for a Galaxy tool.
    :dedent:

You can remove an existing input value with `remove_input()` or change the value with `change_input_value()`.

Workflow Parameters
-------------------

The `WorkflowParameters` class is specifically designed for passing inputs and parameters to Galaxy workflows. It provides a more explicit, bioblend-style approach to define workflow-level inputs and parameters for individual steps within a workflow.

**Workflow-level Inputs (`add_workflow_input`)**

Use `add_workflow_input` to provide values for the overall workflow inputs, which are typically identified by numerical IDs (e.g., "0", "1") as defined in the workflow. These can be datasets, dataset collections, or simple values.

.. code-block:: python

    from nova.galaxy.parameters import WorkflowParameters
    from nova.galaxy.dataset import Dataset, DatasetCollection

    workflow_params = WorkflowParameters()

    # Adding a dataset as a workflow input (input ID "0")
    my_dataset = Dataset(id="your_dataset_id")
    workflow_params.add_workflow_input("0", my_dataset)

    # Adding a dataset collection as a workflow input (input ID "1")
    my_collection = DatasetCollection(id="your_collection_id")
    workflow_params.add_workflow_input("1", my_collection)

    # Adding a simple text value as a workflow input (input ID "2")
    workflow_params.add_workflow_input("2", "my_text_input")

**Step-level Parameters (`add_step_param`)**

Use `add_step_param` to set parameters for specific steps within the workflow. These are identified by the step's ID (e.g., "2", "4") and a parameter path (e.g., "input", "series_0|input_mode|export_folder").

.. code-block:: python

    from nova.galaxy.parameters import WorkflowParameters
    from nova.galaxy.dataset import Dataset

    workflow_params = WorkflowParameters()

    # Setting a parameter for step "2" with parameter path "input"
    # This could be a dataset, dataset collection, or simple value
    input_for_step_2 = Dataset(id="another_dataset_id")
    workflow_params.add_step_param("2", "input", input_for_step_2)

    # Setting a text parameter for step "3" with parameter path "some_option"
    workflow_params.add_step_param("3", "some_option", "value_for_option")

    # Setting a list of datasets for a parameter in step "4"
    list_of_datasets = [Dataset(id="ds1"), Dataset(id="ds2")]
    workflow_params.add_step_param("4", "multiple_inputs", list_of_datasets)

When running a workflow, you pass an instance of `WorkflowParameters` to the `Workflow.run()` method.
 No newline at end of file
+35 −27
Original line number Diff line number Diff line
@@ -35,36 +35,44 @@ To start, you need the ID of the Galaxy workflow you want to run.
Running a Workflow
~~~~~~~~~~~~~~~~~~

To run the workflow, you use the ``run()`` method. This method requires a ``Datastore`` (representing the Galaxy history) and optionally a ``Parameters`` object for inputs.
To run the workflow, you use the ``run()`` method. This method requires a ``Datastore`` (representing the Galaxy history) and optionally a ``WorkflowParameters`` object for inputs and step-specific parameters.

.. code-block:: python

    from nova.galaxy.data_store import Datastore
    from nova.galaxy.parameters import Parameters
    from nova.galaxy.dataset import Dataset # Assuming you have an input dataset
    from nova.galaxy.parameters import WorkflowParameters
    from nova.galaxy.dataset import Dataset, DatasetCollection

    # Assume 'galaxy_connection' is an established Connection object
    # Assume 'history_id' is the ID of the target Galaxy history
    data_store = Datastore(galaxy_connection, history_id=history_id)

    # Prepare parameters (if any)
    params = Parameters()
    # Example: Adding an input dataset. 'input_dataset_label' is the label
    # of the workflow input as defined in Galaxy.
    # 'input_ds_id' is the Galaxy ID of an existing dataset in the history.
    input_dataset = Dataset(name="My Input Data", id="input_ds_id")
    input_dataset.store = data_store # Associate dataset with the datastore
    params.add_input("input_dataset_label", input_dataset)
    # Prepare workflow parameters
    workflow_params = WorkflowParameters()

    # Example: Setting a tool parameter within the workflow.
    # 'workflow_step_label' is the label of the step in Galaxy.
    # 'parameter_name' is the name of the parameter for that tool.
    params.add_parameter("workflow_step_label", {"parameter_name": "parameter_value"})
    # Example 1: Providing a dataset as a workflow-level input
    # '0' is the input ID of the workflow (as defined in Galaxy)
    # 'your_input_dataset_id' is the Galaxy ID of an existing dataset in the history.
    input_dataset = Dataset(id="your_input_dataset_id")
    workflow_params.add_workflow_input("0", input_dataset)

    # Example 2: Providing a dataset collection as a workflow-level input
    # '1' is another input ID of the workflow
    input_collection = DatasetCollection(id="your_input_collection_id")
    workflow_params.add_workflow_input("1", input_collection)

    # Example 3: Setting a parameter for a specific step within the workflow
    # '2' is the ID of the workflow step (as defined in Galaxy)
    # 'some_tool_param' is the parameter path within that step
    workflow_params.add_step_param("2", "some_tool_param", "some_value")

    # Example 4: Setting a list of datasets for a parameter in a step
    list_of_datasets = [Dataset(id="ds_id_1"), Dataset(id="ds_id_2")]
    workflow_params.add_step_param("3", "multiple_inputs", list_of_datasets)

    # Run the workflow and wait for completion (default behavior)
    try:
        outputs = my_workflow.run(data_store=data_store, params=params, wait=True)
        outputs = my_workflow.run(data_store=data_store, params=workflow_params, wait=True)
        if outputs:
            print("Workflow completed successfully!")
    except Exception as e:
@@ -147,27 +155,27 @@ Each workflow run (invocation) has a unique ID in Galaxy. You can retrieve this
    if invocation_id:
        print(f"Galaxy Invocation ID: {invocation_id}")

Accessing Step-Level Jobs
~~~~~~~~~~~~~~~~~~~~~~~~~
Accessing Step-Level Tools
~~~~~~~~~~~~~~~~~~~~~~~~~~

Workflows are composed of individual tool executions (jobs). You can access these as ``Job`` objects using ``get_step_jobs()``. This is useful for monitoring progress at a finer grain or retrieving logs from specific steps.
Workflows are composed of individual tool executions. You can access these as ``Tool`` objects using ``get_step_jobs()``. This is useful for monitoring progress at a finer grain or retrieving logs from specific steps.

.. code-block:: python

    from nova.galaxy.job import Job
    from nova.galaxy.tool import Tool

    step_jobs: List[Job] = my_workflow.get_step_jobs()
    for job in step_jobs:
        print(f"Step Tool ID: {job.tool_id}, Status: {job.get_status()}")
        if job.get_status() == WorkState.ERROR:
            full_job_status = job.get_full_status()
            print(f"  Job Error Details: {full_job_status.details if full_job_status else 'N/A'}")
    step_tools: List[Tool] = my_workflow.get_step_jobs()
    for tool in step_tools:
        print(f"Step Tool ID: {tool.id}, Status: {tool.get_status()}")
        if tool.get_status() == WorkState.ERROR:
            full_tool_status = tool.get_full_status()
            print(f"  Tool Error Details: {full_tool_status.details if full_tool_status else 'N/A'}")


Important Notes
---------------

*   **Workflow Definition**: The structure of your ``Parameters`` object (input labels, step labels for parameters) must match how the workflow is defined in Galaxy. Use the Galaxy UI or API to inspect your workflow's inputs and step details.
*   **Workflow Definition**: The structure of your ``WorkflowParameters`` object (workflow input IDs, step IDs, and parameter paths) must match how the workflow is defined in Galaxy. Use the Galaxy UI or API to inspect your workflow's inputs and step details.
*   **Dataset IDs**: When providing ``Dataset`` or ``DatasetCollection`` objects as inputs, they must already exist in the Galaxy history and have their ``id`` attribute populated.
*   **Error Handling**: Always wrap ``run()`` calls (especially with ``wait=True``) in try-except blocks to handle potential exceptions during workflow execution. Check ``get_full_status().details`` for more information on errors.
*   **State Management**: The ``Workflow`` object primarily manages the state of its *last* invocation. If you need to manage multiple concurrent runs of the same workflow definition, instantiate a new ``Workflow`` object for each run.
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.11.1"
version = "0.11.2"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>", "Sergey Yakubov <yakubovs@ornl.gov>"]
readme = "README.md"
+2 −1
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ from .data_store import Datastore
from .dataset import Dataset, DatasetCollection
from .interfaces import BasicTool
from .outputs import Outputs
from .parameters import Parameters
from .parameters import Parameters, WorkflowParameters
from .tool import Tool
from .tool_runner import ToolRunner
from .workflow import Workflow
@@ -21,6 +21,7 @@ __all__ = [
    "Tool",
    "ToolRunner",
    "Workflow",
    "WorkflowParameters",
]

__version__ = importlib.metadata.version("nova-galaxy")
+83 −1
Original line number Diff line number Diff line
"""Parameters are input values for Galaxy tools."""
"""Parameters are input values for Galaxy tools and workflows."""

from typing import Any, Dict

from .dataset import Dataset, DatasetCollection


class Parameters:
    """Specialized map wrapper used as an input to a Galaxy tool."""
@@ -18,3 +20,83 @@ class Parameters:

    def remove_input(self, name: str) -> None:
        self.inputs.pop(name)


class WorkflowParameters:
    """Handles workflow parameters using explicit bioblend-style approach."""

    def __init__(self) -> None:
        self.workflow_inputs: Dict[str, Any] = {}
        self.step_params: Dict[str, Dict[str, Any]] = {}

    def add_workflow_input(self, input_id: str, value: Any) -> None:
        """Add a workflow-level input.

        Parameters
        ----------
        input_id : str
            The workflow input ID (e.g., "0", "1")
        value : Any
            The input value (Dataset, DatasetCollection, or simple value)
        """
        if isinstance(value, Dataset):
            if not value.id:
                raise ValueError(f"Dataset for workflow input '{input_id}' must have an ID")
            self.workflow_inputs[input_id] = {"src": "hda", "id": value.id}
        elif isinstance(value, DatasetCollection):
            if not value.id:
                raise ValueError(f"DatasetCollection for workflow input '{input_id}' must have an ID")
            self.workflow_inputs[input_id] = {"src": "hdca", "id": value.id}
        else:
            # Simple values (strings, booleans, etc.)
            self.workflow_inputs[input_id] = value

    def add_step_param(self, step_id: str, param_path: str, value: Any) -> None:
        """Add a step-level parameter.

        Parameters
        ----------
        step_id : str
            The workflow step ID (e.g., "2", "4")
        param_path : str
            The parameter path within the step (e.g., "input", "series_0|input_mode|export_folder")
        value : Any
            The parameter value
        """
        if step_id not in self.step_params:
            self.step_params[step_id] = {}

        if isinstance(value, list):
            # Handle list of datasets
            param_list = []
            for item in value:
                if isinstance(item, Dataset):
                    if not item.id:
                        raise ValueError(f"Dataset for step {step_id} parameter {param_path} must have an ID")
                    param_list.append({"src": "hda", "id": item.id})
                elif isinstance(item, DatasetCollection):
                    if not item.id:
                        raise ValueError(f"DatasetCollection for step {step_id}'parameter {param_path} must have an ID")
                    param_list.append({"src": "hdca", "id": item.id})
                else:
                    param_list.append(item)
            self.step_params[step_id][param_path] = param_list
        elif isinstance(value, Dataset):
            if not value.id:
                raise ValueError(f"Dataset for step '{step_id}' parameter '{param_path}' must have an ID")
            self.step_params[step_id][param_path] = {"src": "hda", "id": value.id}
        elif isinstance(value, DatasetCollection):
            if not value.id:
                raise ValueError(f"DatasetCollection for step '{step_id}' parameter '{param_path}' must have an ID")
            self.step_params[step_id][param_path] = {"src": "hdca", "id": value.id}
        else:
            # Simple values
            self.step_params[step_id][param_path] = value

    def get_bioblend_inputs(self) -> Dict[str, Any]:
        """Get the workflow inputs in bioblend format."""
        return self.workflow_inputs.copy()

    def get_bioblend_params(self) -> Dict[str, Dict[str, Any]]:
        """Get the step parameters in bioblend format."""
        return self.step_params.copy()
Loading