Commit 1903dc6f authored by andrewfayres's avatar andrewfayres
Browse files

Making some doc and lint fixes

parent 1c8ed34c
Loading
Loading
Loading
Loading
Loading
+0 −18
Original line number Diff line number Diff line
@@ -5,21 +5,3 @@ API Reference

.. automodule:: nova.galaxy
   :members:

.. automodule:: nova.galaxy.data_store
   :members:

.. automodule:: nova.galaxy.dataset
   :members:

.. automodule:: nova.galaxy.parameters
   :members:

.. automodule:: nova.galaxy.outputs
   :members:

.. automodule:: nova.galaxy.tool
   :members:

.. automodule:: nova.galaxy.tool_runner
   :members:
+103 −82
Original line number Diff line number Diff line
"""Contains classes to run workflows in Galaxy via Connection."""

import time
from threading import Lock, Thread
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from bioblend import galaxy
from typing import TYPE_CHECKING, Dict, List, Optional

if TYPE_CHECKING:
    from .data_store import Datastore
    from .dataset import AbstractData
    from .job import Job

from .dataset import AbstractData, Dataset, DatasetCollection
from nova.common.job import WorkState

from .dataset import Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters
from .tool import AbstractWork
from nova.common.job import WorkState


class InvocationStatus:
@@ -24,7 +21,7 @@ class InvocationStatus:
    def __init__(self) -> None:
        self.lock = Lock()
        self._state = WorkState.NOT_STARTED
        self._details = "" # Consolidated error_msg here
        self._details = ""

    @property
    def state(self) -> WorkState:
@@ -69,11 +66,9 @@ class Invocation:
            "error": WorkState.ERROR,
            "paused": WorkState.QUEUED,
            "canceled": WorkState.CANCELED,
            
            # Invocation-specific states
            "scheduled": WorkState.QUEUED,
            "failed": WorkState.ERROR,
            
            # Job-specific states
            "upload": WorkState.UPLOADING_DATA,
            "waiting": WorkState.QUEUED,
@@ -85,13 +80,13 @@ class Invocation:
    def _extract_error_details_from_invocation(self, invocation_details: Dict) -> str:
        """Parses invocation details to extract a detailed error message."""
        error_list = [f"Invocation failed. Overall State: {invocation_details.get('state', 'Unknown')}."]
        if 'steps' in invocation_details and isinstance(invocation_details['steps'], list):
            for step in invocation_details['steps']:
                step_galaxy_state = step.get('state')
        if "steps" in invocation_details and isinstance(invocation_details["steps"], list):
            for step in invocation_details["steps"]:
                step_galaxy_state = step.get("state")
                if self._map_galaxy_state_to_workstate(step_galaxy_state) == WorkState.ERROR:
                    step_label = step.get('workflow_step_label', f"Step ID {step.get('id', 'Unknown')}")
                    step_label = step.get("workflow_step_label", f"Step ID {step.get('id', 'Unknown')}")
                    error_list.append(f"  - Step '{step_label}' failed with state: {step_galaxy_state}.")
                    # TODO: Future enhancement: Extract more specific error messages from step['messages'] or job details if available.
                    # TODO: Future enhancement: Extract more specific error messages from step['messages']
        return "\n".join(error_list)

    def _run_and_wait(self, params: Optional[Parameters]) -> None:
@@ -101,7 +96,7 @@ class Invocation:
            if self.invocation_id:
                self.wait_for_results()
                invocation_details = self.galaxy_instance.invocations.show_invocation(self.invocation_id)
                self.status.state = self._map_galaxy_state_to_workstate(invocation_details['state'])
                self.status.state = self._map_galaxy_state_to_workstate(invocation_details["state"])
                if self.status.state == WorkState.ERROR:
                    self.status.details = self._extract_error_details_from_invocation(invocation_details)
                elif self.status.state == WorkState.FINISHED:
@@ -114,7 +109,6 @@ class Invocation:
            self.status.state = WorkState.ERROR
            self.status.details = f"Error during workflow execution or waiting: {str(e)}"


    def run(self, params: Optional[Parameters], wait: bool) -> Optional[Outputs]:
        """Runs the workflow invocation."""
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELED]:
@@ -130,8 +124,10 @@ class Invocation:
                return self.get_results()
            return None
        else:
            raise Exception(f"Workflow {self.workflow_id} (invocation: {self.invocation_id}) is already running or in an intermediate state ({self.status.state}). Cannot start a new run.")

            raise Exception(
                f"Workflow {self.workflow_id} (invocation: {self.invocation_id}) is already running or in an"
                f"intermediate state ({self.status.state}). Cannot start a new run."
            )

    def submit(self, params: Optional[Parameters]) -> None:
        """Handles input preparation and submits the workflow invocation."""
@@ -140,35 +136,57 @@ class Invocation:

        try:
            workflow_details = self.galaxy_instance.workflows.show_workflow(self.workflow_id)
            label_to_input_id = {v['label']: k for k, v in workflow_details.get('inputs', {}).items() if v.get('label')}
            label_to_step_id = {step['label']: str(step['id']) for step in workflow_details.get('steps', {}).values() if step.get('label')}
            label_to_input_id = {v["label"]: k for k, v in workflow_details.get("inputs", {}).items() if v.get("label")}
            label_to_step_id = {
                step["label"]: str(step["id"])
                for step in workflow_details.get("steps", {}).values()
                if step.get("label")
            }

            if params:
                for label, value in params.inputs.items():
                    if isinstance(value, Dataset):
                        input_id = label_to_input_id.get(label)
                        if not input_id:
                            raise ValueError(f"Workflow input label '{label}' not found in workflow '{self.workflow_id}'. Available input labels: {list(label_to_input_id.keys())}")
                            raise ValueError(
                                f"Input label '{label}' not found in workflow '{self.workflow_id}'."
                                f" Available input labels: {list(label_to_input_id.keys())}"
                            )
                        if not value.id:
                             raise ValueError(f"Input dataset '{label}' must have an ID (must exist in Galaxy history). Upload it first if necessary.")
                        bioblend_inputs[input_id] = {'src': 'hda', 'id': value.id}
                            raise ValueError(
                                f"Input dataset '{label}' must have an ID (must exist"
                                " in Galaxy history). Upload it first if necessary."
                            )
                        bioblend_inputs[input_id] = {"src": "hda", "id": value.id}
                    elif isinstance(value, DatasetCollection):
                        input_id = label_to_input_id.get(label)
                        if not input_id:
                            raise ValueError(f"Workflow input label '{label}' not found in workflow '{self.workflow_id}'. Available input labels: {list(label_to_input_id.keys())}")
                            raise ValueError(
                                f"Input label '{label}' not found in workflow '{self.workflow_id}'."
                                f" Available input labels: {list(label_to_input_id.keys())}"
                            )
                        if not value.id:
                             raise ValueError(f"Input dataset collection '{label}' must have an ID (must exist in Galaxy history).")
                        bioblend_inputs[input_id] = {'src': 'hdca', 'id': value.id}
                            raise ValueError(
                                f"Input dataset collection '{label}' must have an ID (must exist in Galaxy history)."
                            )
                        bioblend_inputs[input_id] = {"src": "hdca", "id": value.id}
                    elif isinstance(value, dict):
                        step_id = label_to_step_id.get(label)
                        if not step_id:
                             raise ValueError(f"Workflow step label '{label}' not found in workflow '{self.workflow_id}' for setting parameters. Available step labels: {list(label_to_step_id.keys())}")
                            raise ValueError(
                                f"Step label '{label}' not found in workflow "
                                "'{self.workflow_id}' for setting parameters. Available "
                                f"step labels: {list(label_to_step_id.keys())}"
                            )
                        bioblend_params[step_id] = value
                    else:
                        input_id = label_to_input_id.get(label)
                        step_id = label_to_step_id.get(label)
                        if not input_id and not step_id:
                            print(f"Warning: Parameter '{label}' is not a Dataset, DatasetCollection, or a dictionary associated with a known step label. It will be ignored.")
                            print(
                                f"Warning: Parameter '{label}' is not a Dataset, DatasetCollection, "
                                "or dictionary associated with a known step label. It will be ignored."
                            )
                        else:
                            if input_id:
                                bioblend_inputs[input_id] = value
@@ -181,23 +199,21 @@ class Invocation:
                inputs=bioblend_inputs,
                params=bioblend_params,
                history_id=self.store.history_id,
                parameters_normalized=False
                parameters_normalized=False,
            )
            self.invocation_id = invocation_info['id']
            self.status.state = self._map_galaxy_state_to_workstate(invocation_info['state'])
            self.invocation_id = invocation_info["id"]
            self.status.state = self._map_galaxy_state_to_workstate(invocation_info["state"])
        except Exception as e:
            self.status.state = WorkState.ERROR
            self.status.details = f"Failed to prepare or submit workflow invocation: {str(e)}"
            self.invocation_id = None


    def wait_for_results(self) -> None:
        """Waits for the workflow invocation to complete."""
        if not self.invocation_id:
            raise Exception("Cannot wait for results, invocation ID is not set.")
        self.galaxy_instance.invocations.wait_for_invocation(self.invocation_id)


    def get_state(self) -> InvocationStatus:
        """Returns the current state of the workflow invocation."""
        if not self.invocation_id or self.status.state in [WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELED]:
@@ -205,7 +221,7 @@ class Invocation:

        try:
            invocation_details = self.galaxy_instance.invocations.show_invocation(self.invocation_id)
            self.status.state = self._map_galaxy_state_to_workstate(invocation_details['state'])
            self.status.state = self._map_galaxy_state_to_workstate(invocation_details["state"])
            if self.status.state == WorkState.ERROR and not self.status.details:  # Check details
                self.status.details = self._extract_error_details_from_invocation(invocation_details)
            if self.status.state == WorkState.FINISHED:
@@ -216,7 +232,6 @@ class Invocation:

        return self.status


    def get_results(self) -> Optional[Outputs]:
        """Returns the results (outputs) from a completed workflow invocation."""
        current_status = self.get_state()
@@ -227,33 +242,38 @@ class Invocation:

        if not self.outputs_data:
            try:
                assert isinstance(self.invocation_id, str)
                self.outputs_data = self.galaxy_instance.invocations.show_invocation(self.invocation_id)
            except Exception as e:
                 raise Exception(f"Failed to fetch invocation details for results processing: {e}")
                raise Exception(f"Failed to fetch invocation details for results processing: {e}") from e

        outputs = Outputs()
        try:
            if 'outputs' in self.outputs_data:
                 for output_name, dataset_info in self.outputs_data['outputs'].items():
                     if dataset_info and 'id' in dataset_info and 'src' in dataset_info and dataset_info['src'] == 'hda':
            if "outputs" in self.outputs_data:
                for output_name, dataset_info in self.outputs_data["outputs"].items():
                    if dataset_info and "id" in dataset_info and "src" in dataset_info and dataset_info["src"] == "hda":
                        d = Dataset(output_name)
                         d.id = dataset_info['id']
                        d.id = dataset_info["id"]
                        d.store = self.store
                        outputs.add_output(d)

            if 'output_collections' in self.outputs_data:
                 for output_name, collection_info in self.outputs_data['output_collections'].items():
                     if collection_info and 'id' in collection_info and 'src' in collection_info and collection_info['src'] == 'hdca':
            if "output_collections" in self.outputs_data:
                for output_name, collection_info in self.outputs_data["output_collections"].items():
                    if (
                        collection_info
                        and "id" in collection_info
                        and "src" in collection_info
                        and collection_info["src"] == "hdca"
                    ):
                        dc = DatasetCollection(output_name)
                         dc.id = collection_info['id']
                        dc.id = collection_info["id"]
                        dc.store = self.store
                        outputs.add_output(dc)

            return outputs

        except Exception as e:
            raise Exception(f"Error processing invocation results: {e}")

            raise Exception(f"Error processing invocation results: {e}") from e

    def cancel(self) -> bool:
        """Cancels the workflow invocation."""
@@ -265,7 +285,7 @@ class Invocation:
            if success:
                self.status.state = WorkState.CANCELED
                self.status.details = "Invocation canceled by user."
            return success
            return True
        except Exception as e:
            print(f"Error cancelling invocation {self.invocation_id}: {e}")
            return False
@@ -286,13 +306,13 @@ class Invocation:
            step_jobs = []

            for job_info in jobs_summary:
                if job_info.get('id') and job_info.get('tool_id'):
                if job_info.get("id") and job_info.get("tool_id"):
                    # Create a Job instance for this step
                    job = Job(job_info['tool_id'], self.store)
                    job.id = job_info['id']
                    job = Job(job_info["tool_id"], self.store)
                    job.id = job_info["id"]

                    # Map Galaxy job state to WorkState
                    galaxy_state = job_info.get('state', 'unknown')
                    galaxy_state = job_info.get("state", "unknown")
                    job.status.state = self._map_galaxy_state_to_workstate(galaxy_state)

                    step_jobs.append(job)
@@ -302,6 +322,7 @@ class Invocation:
            print(f"Warning: Could not fetch invocation step jobs for {self.invocation_id}: {e}")
            return []


class Workflow(AbstractWork):
    """Represents a Galaxy workflow that can be invoked (run).

@@ -417,11 +438,11 @@ class Workflow(AbstractWork):

    def stop(self) -> bool:
        """Stops (cancels) the currently running workflow invocation.

        Alias for cancel().
        """
        return self.cancel()


    def get_invocation_id(self) -> Optional[str]:
        """Gets the Galaxy invocation ID for the last run.

+14 −13
Original line number Diff line number Diff line
"""Integration tests for Workflow functionality."""

import os
import time

import pytest

from nova.common.job import WorkState
from nova.galaxy.connection import Connection
from nova.galaxy.data_store import Datastore
from nova.galaxy.parameters import Parameters
from nova.galaxy.workflow import Workflow

@@ -18,15 +14,17 @@ GALAXY_API_KEY = os.environ.get("NOVA_GALAXY_TEST_GALAXY_KEY", "")
WORKFLOW_NAME = "Simple_test_workflow"
TEST_HISTORY_NAME_WF = "nova_galaxy_workflow_test_history"

def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection):

def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> None:
    """
    Tests the Workflow class lifecycle methods when using a placeholder workflow ID.

    This test expects failures when trying to run the workflow, as the ID is a placeholder.
    """
    with nova_instance.connect() as connection:
        ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF)
        workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True)
        workflow_id = workflows[0]['id']
        workflow_id = workflows[0]["id"]
        params = Parameters()

        workflow = Workflow(id=workflow_id)
@@ -42,8 +40,9 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection):
        print(f"Invocation ID after run(wait=True): {invocation_id}")
        print(f"Full status details: {full_status.details if full_status else 'N/A'}")

        assert status in [WorkState.ERROR, WorkState.QUEUED], \
        assert status in [WorkState.ERROR, WorkState.QUEUED], (
            f"Expected ERROR or QUEUED state after run(wait=False), got {status}"
        )

        if status == WorkState.ERROR:
            assert full_status is not None
@@ -65,10 +64,12 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection):
        assert len(step_jobs) == 0, "Expected no step jobs for a placeholder/failed workflow"

        final_status = workflow.get_status()
        assert final_status in [WorkState.ERROR, WorkState.CANCELED, WorkState.QUEUED], \
        assert final_status in [WorkState.ERROR, WorkState.CANCELED, WorkState.QUEUED], (
            f"Unexpected final state: {final_status}"
        )


def test_workflow_initial_state():
def test_workflow_initial_state() -> None:
    """Tests the initial state of a Workflow object before any run."""
    workflow = Workflow(id="another_placeholder_id")
    assert workflow.id == "another_placeholder_id"