Commit 0ba7eee8 authored by Ayres, Andrew's avatar Ayres, Andrew
Browse files

Minor improvements to workflow class and tests

parent fc7ca763
Loading
Loading
Loading
Loading
Loading
+52 −0
Original line number Diff line number Diff line
@@ -11,3 +11,55 @@ The `Parameters` class is used to define the input parameters for a Galaxy tool.
    :dedent:

You can remove an existing input value with `remove_input()` or change the value with `change_input_value()`.

Workflow Parameters
-------------------

The `WorkflowParameters` class is specifically designed for passing inputs and parameters to Galaxy workflows. It provides a more explicit, bioblend-style approach to define workflow-level inputs and parameters for individual steps within a workflow.

**Workflow-level Inputs (`add_workflow_input`)**

Use `add_workflow_input` to provide values for the overall workflow inputs, which are typically identified by numerical IDs (e.g., "0", "1") as defined in the workflow. These can be datasets, dataset collections, or simple values.

.. code-block:: python

    from nova.galaxy.parameters import WorkflowParameters
    from nova.galaxy.dataset import Dataset, DatasetCollection

    workflow_params = WorkflowParameters()

    # Adding a dataset as a workflow input (input ID "0")
    my_dataset = Dataset(id="your_dataset_id")
    workflow_params.add_workflow_input("0", my_dataset)

    # Adding a dataset collection as a workflow input (input ID "1")
    my_collection = DatasetCollection(id="your_collection_id")
    workflow_params.add_workflow_input("1", my_collection)

    # Adding a simple text value as a workflow input (input ID "2")
    workflow_params.add_workflow_input("2", "my_text_input")

**Step-level Parameters (`add_step_param`)**

Use `add_step_param` to set parameters for specific steps within the workflow. These are identified by the step's ID (e.g., "2", "4") and a parameter path (e.g., "input", "series_0|input_mode|export_folder").

.. code-block:: python

    from nova.galaxy.parameters import WorkflowParameters
    from nova.galaxy.dataset import Dataset

    workflow_params = WorkflowParameters()

    # Setting a parameter for step "2" with parameter path "input"
    # This could be a dataset, dataset collection, or simple value
    input_for_step_2 = Dataset(id="another_dataset_id")
    workflow_params.add_step_param("2", "input", input_for_step_2)

    # Setting a text parameter for step "3" with parameter path "some_option"
    workflow_params.add_step_param("3", "some_option", "value_for_option")

    # Setting a list of datasets for a parameter in step "4"
    list_of_datasets = [Dataset(id="ds1"), Dataset(id="ds2")]
    workflow_params.add_step_param("4", "multiple_inputs", list_of_datasets)

When running a workflow, you pass an instance of `WorkflowParameters` to the `Workflow.run()` method.
 No newline at end of file
+35 −27
Original line number Diff line number Diff line
@@ -35,36 +35,44 @@ To start, you need the ID of the Galaxy workflow you want to run.
Running a Workflow
~~~~~~~~~~~~~~~~~~

To run the workflow, you use the ``run()`` method. This method requires a ``Datastore`` (representing the Galaxy history) and optionally a ``Parameters`` object for inputs.
To run the workflow, you use the ``run()`` method. This method requires a ``Datastore`` (representing the Galaxy history) and optionally a ``WorkflowParameters`` object for inputs and step-specific parameters.

.. code-block:: python

    from nova.galaxy.data_store import Datastore
    from nova.galaxy.parameters import Parameters
    from nova.galaxy.dataset import Dataset # Assuming you have an input dataset
    from nova.galaxy.parameters import WorkflowParameters
    from nova.galaxy.dataset import Dataset, DatasetCollection

    # Assume 'galaxy_connection' is an established Connection object
    # Assume 'history_id' is the ID of the target Galaxy history
    data_store = Datastore(galaxy_connection, history_id=history_id)

    # Prepare parameters (if any)
    params = Parameters()
    # Example: Adding an input dataset. 'input_dataset_label' is the label
    # of the workflow input as defined in Galaxy.
    # 'input_ds_id' is the Galaxy ID of an existing dataset in the history.
    input_dataset = Dataset(name="My Input Data", id="input_ds_id")
    input_dataset.store = data_store # Associate dataset with the datastore
    params.add_input("input_dataset_label", input_dataset)
    # Prepare workflow parameters
    workflow_params = WorkflowParameters()

    # Example: Setting a tool parameter within the workflow.
    # 'workflow_step_label' is the label of the step in Galaxy.
    # 'parameter_name' is the name of the parameter for that tool.
    params.add_parameter("workflow_step_label", {"parameter_name": "parameter_value"})
    # Example 1: Providing a dataset as a workflow-level input
    # '0' is the input ID of the workflow (as defined in Galaxy)
    # 'your_input_dataset_id' is the Galaxy ID of an existing dataset in the history.
    input_dataset = Dataset(id="your_input_dataset_id")
    workflow_params.add_workflow_input("0", input_dataset)

    # Example 2: Providing a dataset collection as a workflow-level input
    # '1' is another input ID of the workflow
    input_collection = DatasetCollection(id="your_input_collection_id")
    workflow_params.add_workflow_input("1", input_collection)

    # Example 3: Setting a parameter for a specific step within the workflow
    # '2' is the ID of the workflow step (as defined in Galaxy)
    # 'some_tool_param' is the parameter path within that step
    workflow_params.add_step_param("2", "some_tool_param", "some_value")

    # Example 4: Setting a list of datasets for a parameter in a step
    list_of_datasets = [Dataset(id="ds_id_1"), Dataset(id="ds_id_2")]
    workflow_params.add_step_param("3", "multiple_inputs", list_of_datasets)

    # Run the workflow and wait for completion (default behavior)
    try:
        outputs = my_workflow.run(data_store=data_store, params=params, wait=True)
        outputs = my_workflow.run(data_store=data_store, params=workflow_params, wait=True)
        if outputs:
            print("Workflow completed successfully!")
    except Exception as e:
@@ -147,27 +155,27 @@ Each workflow run (invocation) has a unique ID in Galaxy. You can retrieve this
    if invocation_id:
        print(f"Galaxy Invocation ID: {invocation_id}")

Accessing Step-Level Jobs
~~~~~~~~~~~~~~~~~~~~~~~~~
Accessing Step-Level Tools
~~~~~~~~~~~~~~~~~~~~~~~~~~

Workflows are composed of individual tool executions (jobs). You can access these as ``Job`` objects using ``get_step_jobs()``. This is useful for monitoring progress at a finer grain or retrieving logs from specific steps.
Workflows are composed of individual tool executions. You can access these as ``Tool`` objects using ``get_step_jobs()``. This is useful for monitoring progress at a finer grain or retrieving logs from specific steps.

.. code-block:: python

    from nova.galaxy.job import Job
    from nova.galaxy.tool import Tool

    step_jobs: List[Job] = my_workflow.get_step_jobs()
    for job in step_jobs:
        print(f"Step Tool ID: {job.tool_id}, Status: {job.get_status()}")
        if job.get_status() == WorkState.ERROR:
            full_job_status = job.get_full_status()
            print(f"  Job Error Details: {full_job_status.details if full_job_status else 'N/A'}")
    step_tools: List[Tool] = my_workflow.get_step_jobs()
    for tool in step_tools:
        print(f"Step Tool ID: {tool.id}, Status: {tool.get_status()}")
        if tool.get_status() == WorkState.ERROR:
            full_tool_status = tool.get_full_status()
            print(f"  Tool Error Details: {full_tool_status.details if full_tool_status else 'N/A'}")


Important Notes
---------------

*   **Workflow Definition**: The structure of your ``Parameters`` object (input labels, step labels for parameters) must match how the workflow is defined in Galaxy. Use the Galaxy UI or API to inspect your workflow's inputs and step details.
*   **Workflow Definition**: The structure of your ``WorkflowParameters`` object (workflow input IDs, step IDs, and parameter paths) must match how the workflow is defined in Galaxy. Use the Galaxy UI or API to inspect your workflow's inputs and step details.
*   **Dataset IDs**: When providing ``Dataset`` or ``DatasetCollection`` objects as inputs, they must already exist in the Galaxy history and have their ``id`` attribute populated.
*   **Error Handling**: Always wrap ``run()`` calls (especially with ``wait=True``) in try-except blocks to handle potential exceptions during workflow execution. Check ``get_full_status().details`` for more information on errors.
*   **State Management**: The ``Workflow`` object primarily manages the state of its *last* invocation. If you need to manage multiple concurrent runs of the same workflow definition, instantiate a new ``Workflow`` object for each run.
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.11.1"
version = "0.11.2"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>", "Sergey Yakubov <yakubovs@ornl.gov>"]
readme = "README.md"
+57 −0
Original line number Diff line number Diff line
@@ -173,7 +173,14 @@ class Invocation:
        """Waits for the workflow invocation to complete."""
        if not self.invocation_id:
            raise Exception("Cannot wait for results, invocation ID is not set.")

        # galaxy returns once all steps are scheduled instead of complete. Need to wait for each job to complete
        self.galaxy_instance.invocations.wait_for_invocation(self.invocation_id)
        for step in self.get_step_jobs():
            if step._job is not None:
                step._job.wait_for_results()
                if step.get_status() is not WorkState.FINISHED:
                    return

    def get_state(self) -> InvocationStatus:
        """Returns the current state of the workflow invocation."""
@@ -183,6 +190,15 @@ class Invocation:
        try:
            invocation_details = self.galaxy_instance.invocations.show_invocation(self.invocation_id)
            self.status.state = self._map_galaxy_state_to_workstate(invocation_details["state"])
            # Galaxy doesn't update workflow state to finished but leaves them at scheduled. Checking each job.
            if self.status.state is WorkState.QUEUED:
                jobs_finished = True
                for step in self.get_step_jobs():
                    if step.get_status() is not WorkState.FINISHED:
                        jobs_finished = False
                if jobs_finished:
                    self.status.state = WorkState.FINISHED

            if self.status.state == WorkState.ERROR and not self.status.details:  # Check details
                self.status.details = self._extract_error_details_from_invocation(invocation_details)
            if self.status.state == WorkState.FINISHED:
@@ -277,6 +293,24 @@ class Invocation:
            print(f"Warning: Could not fetch invocation step jobs for {self.invocation_id}: {e}")
            return []

    def get_step_name(self, step_number: int) -> str:
        if not self.invocation_id:
            return ""

        try:
            steps = self.galaxy_instance.invocations.show_invocation(self.invocation_id).get("steps")
            if steps is None:
                return ""

            if step_number >= len(steps):
                return ""

            return steps[step_number]["workflow_step_label"]

        except Exception as e:
            print(f"Warning: Could not fetch invocation step jobs for {self.invocation_id}: {e}")
            return ""


class Workflow(AbstractWorkflow):
    """Represents a Galaxy workflow that can be invoked (run).
@@ -439,6 +473,20 @@ class Workflow(AbstractWorkflow):
            return self._invocation.get_step_jobs()
        return []

    def get_step_name(self, step_number: int) -> str:
        """Gets the name of the step in the workflow.

        Returns the string of the name of the step associated with the number.

        Returns
        -------
        str
            Name of the step as declared in Galaxy. Empty if step doesn't exist.
        """
        if self._invocation:
            return self._invocation.get_step_name(step_number)
        return ""

    def get_active_step(self) -> Optional[Tool]:
        """Gets the currently active (running) step in the workflow invocation.

@@ -460,3 +508,12 @@ class Workflow(AbstractWorkflow):
            if job.get_status().state == WorkState.RUNNING:
                return job
        return None

    def wait_for_results(self) -> None:
        """Waits on the workflow to complete.

        This method will wait for a running work to complete
        """
        if not self._invocation:
            return
        return self._invocation.wait_for_results()
+32 −0
Original line number Diff line number Diff line
@@ -69,6 +69,38 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> No
        )


def test_simple_test_workflow_with_dataset(nova_instance: Connection) -> None:
    """Tests running the 'simple_test_workflow_with_dataset' workflow with specific inputs and parameters."""
    with nova_instance.connect() as connection:
        ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF)
        workflows = connection.galaxy_instance.workflows.get_workflows(
            name="simple_test_workflow_with_dataset", published=True
        )

        assert len(workflows) > 0, (
            "'simple_test_workflow_with_dataset' not found. Please ensure it's published in Galaxy."
        )

        workflow_id = workflows[0]["id"]
        params = WorkflowParameters()

        # Set workflow input 0 to True
        params.add_workflow_input("0", True)

        # Set parameter for step 1
        params.add_step_param("1", "params|ingest_mode", "file")
        params.add_step_param("1", "params|filepath", "/SNS/TOPAZ/IPTS-17211/0/22594/NeXus/TOPAZ_22594_event.nxs")

        workflow = Workflow(id=workflow_id)

        workflow.run(data_store=ds, params=params, wait=True)

        # Assertions for successful completion
        assert workflow.get_status() == WorkState.FINISHED, (
            f"Workflow did not finish successfully. Current status: {workflow.get_status()}"
        )


def test_workflow_initial_state() -> None:
    """Tests the initial state of a Workflow object before any run."""
    workflow = Workflow(id="another_placeholder_id")