Commit fc7ca763 authored by andrewfayres's avatar andrewfayres Committed by Ayres, Andrew
Browse files

Modified the way workflow parameters work to be more explicitly declared.

parent ed5b2281
Loading
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ from .data_store import Datastore
from .dataset import Dataset, DatasetCollection
from .interfaces import BasicTool
from .outputs import Outputs
from .parameters import Parameters
from .parameters import Parameters, WorkflowParameters
from .tool import Tool
from .tool_runner import ToolRunner
from .workflow import Workflow
@@ -21,6 +21,7 @@ __all__ = [
    "Tool",
    "ToolRunner",
    "Workflow",
    "WorkflowParameters",
]

__version__ = importlib.metadata.version("nova-galaxy")
+83 −1
Original line number Diff line number Diff line
"""Parameters are input values for Galaxy tools."""
"""Parameters are input values for Galaxy tools and workflows."""

from typing import Any, Dict

from .dataset import Dataset, DatasetCollection


class Parameters:
    """Specialized map wrapper used as an input to a Galaxy tool."""
@@ -18,3 +20,83 @@ class Parameters:

    def remove_input(self, name: str) -> None:
        self.inputs.pop(name)


class WorkflowParameters:
    """Handles workflow parameters using explicit bioblend-style approach."""

    def __init__(self) -> None:
        self.workflow_inputs: Dict[str, Any] = {}
        self.step_params: Dict[str, Dict[str, Any]] = {}

    def add_workflow_input(self, input_id: str, value: Any) -> None:
        """Add a workflow-level input.

        Parameters
        ----------
        input_id : str
            The workflow input ID (e.g., "0", "1")
        value : Any
            The input value (Dataset, DatasetCollection, or simple value)
        """
        if isinstance(value, Dataset):
            if not value.id:
                raise ValueError(f"Dataset for workflow input '{input_id}' must have an ID")
            self.workflow_inputs[input_id] = {"src": "hda", "id": value.id}
        elif isinstance(value, DatasetCollection):
            if not value.id:
                raise ValueError(f"DatasetCollection for workflow input '{input_id}' must have an ID")
            self.workflow_inputs[input_id] = {"src": "hdca", "id": value.id}
        else:
            # Simple values (strings, booleans, etc.)
            self.workflow_inputs[input_id] = value

    def add_step_param(self, step_id: str, param_path: str, value: Any) -> None:
        """Add a step-level parameter.

        Parameters
        ----------
        step_id : str
            The workflow step ID (e.g., "2", "4")
        param_path : str
            The parameter path within the step (e.g., "input", "series_0|input_mode|export_folder")
        value : Any
            The parameter value
        """
        if step_id not in self.step_params:
            self.step_params[step_id] = {}

        if isinstance(value, list):
            # Handle list of datasets
            param_list = []
            for item in value:
                if isinstance(item, Dataset):
                    if not item.id:
                        raise ValueError(f"Dataset for step {step_id} parameter {param_path} must have an ID")
                    param_list.append({"src": "hda", "id": item.id})
                elif isinstance(item, DatasetCollection):
                    if not item.id:
                        raise ValueError(f"DatasetCollection for step {step_id}'parameter {param_path} must have an ID")
                    param_list.append({"src": "hdca", "id": item.id})
                else:
                    param_list.append(item)
            self.step_params[step_id][param_path] = param_list
        elif isinstance(value, Dataset):
            if not value.id:
                raise ValueError(f"Dataset for step '{step_id}' parameter '{param_path}' must have an ID")
            self.step_params[step_id][param_path] = {"src": "hda", "id": value.id}
        elif isinstance(value, DatasetCollection):
            if not value.id:
                raise ValueError(f"DatasetCollection for step '{step_id}' parameter '{param_path}' must have an ID")
            self.step_params[step_id][param_path] = {"src": "hdca", "id": value.id}
        else:
            # Simple values
            self.step_params[step_id][param_path] = value

    def get_bioblend_inputs(self) -> Dict[str, Any]:
        """Get the workflow inputs in bioblend format."""
        return self.workflow_inputs.copy()

    def get_bioblend_params(self) -> Dict[str, Dict[str, Any]]:
        """Get the step parameters in bioblend format."""
        return self.step_params.copy()
+43 −86
Original line number Diff line number Diff line
"""Contains classes to run workflows in Galaxy via Connection."""

from threading import Lock, Thread
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Union

if TYPE_CHECKING:
    from .data_store import Datastore
    from .job import Job

from nova.common.job import WorkState

from .dataset import Dataset, DatasetCollection
from .dataset import AbstractData, Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters
from .tool import AbstractWork
from .parameters import Parameters, WorkflowParameters
from .tool import Tool


class AbstractWorkflow:
    """Abstraction for a runnable object in Galaxy such as a tool or workflow."""

    def __init__(self, id: str):
        self.id = id

    def get_outputs(self) -> List[AbstractData]:
        return []

    def get_inputs(self) -> List[Parameters]:
        return []

    def run(self, data_store: "Datastore", params: WorkflowParameters, wait: bool) -> Union[Outputs, None]:
        return None


class InvocationStatus:
@@ -89,7 +104,7 @@ class Invocation:
                    # TODO: Future enhancement: Extract more specific error messages from step['messages']
        return "\n".join(error_list)

    def _run_and_wait(self, params: Optional[Parameters]) -> None:
    def _run_and_wait(self, params: Optional[WorkflowParameters]) -> None:
        """Submits workflow invocation and waits for completion."""
        try:
            self.submit(params)
@@ -109,7 +124,7 @@ class Invocation:
            self.status.state = WorkState.ERROR
            self.status.details = f"Error during workflow execution or waiting: {str(e)}"

    def run(self, params: Optional[Parameters], wait: bool) -> Optional[Outputs]:
    def run(self, params: Optional[WorkflowParameters], wait: bool) -> Optional[Outputs]:
        """Runs the workflow invocation."""
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELED]:
            self.status = InvocationStatus()
@@ -129,69 +144,15 @@ class Invocation:
                f"intermediate state ({self.status.state}). Cannot start a new run."
            )

    def submit(self, params: Optional[Parameters]) -> None:
        """Handles input preparation and submits the workflow invocation."""
        bioblend_inputs = {}
        bioblend_params = {}

    def submit(self, params: Optional[WorkflowParameters]) -> None:
        """Handles input preparation and submits the workflow invocation using explicit bioblend approach."""
        try:
            workflow_details = self.galaxy_instance.workflows.show_workflow(self.workflow_id)
            label_to_input_id = {v["label"]: k for k, v in workflow_details.get("inputs", {}).items() if v.get("label")}
            label_to_step_id = {
                step["label"]: str(step["id"])
                for step in workflow_details.get("steps", {}).values()
                if step.get("label")
            }

            if params:
                for label, value in params.inputs.items():
                    if isinstance(value, Dataset):
                        input_id = label_to_input_id.get(label)
                        if not input_id:
                            raise ValueError(
                                f"Input label '{label}' not found in workflow '{self.workflow_id}'."
                                f" Available input labels: {list(label_to_input_id.keys())}"
                            )
                        if not value.id:
                            raise ValueError(
                                f"Input dataset '{label}' must have an ID (must exist"
                                " in Galaxy history). Upload it first if necessary."
                            )
                        bioblend_inputs[input_id] = {"src": "hda", "id": value.id}
                    elif isinstance(value, DatasetCollection):
                        input_id = label_to_input_id.get(label)
                        if not input_id:
                            raise ValueError(
                                f"Input label '{label}' not found in workflow '{self.workflow_id}'."
                                f" Available input labels: {list(label_to_input_id.keys())}"
                            )
                        if not value.id:
                            raise ValueError(
                                f"Input dataset collection '{label}' must have an ID (must exist in Galaxy history)."
                            )
                        bioblend_inputs[input_id] = {"src": "hdca", "id": value.id}
                    elif isinstance(value, dict):
                        step_id = label_to_step_id.get(label)
                        if not step_id:
                            raise ValueError(
                                f"Step label '{label}' not found in workflow "
                                "'{self.workflow_id}' for setting parameters. Available "
                                f"step labels: {list(label_to_step_id.keys())}"
                            )
                        bioblend_params[step_id] = value
                    else:
                        input_id = label_to_input_id.get(label)
                        step_id = label_to_step_id.get(label)
                        if not input_id and not step_id:
                            print(
                                f"Warning: Parameter '{label}' is not a Dataset, DatasetCollection, "
                                "or dictionary associated with a known step label. It will be ignored."
                            )
                bioblend_inputs = params.get_bioblend_inputs()
                bioblend_params = params.get_bioblend_params()
            else:
                            if input_id:
                                bioblend_inputs[input_id] = value
                            elif step_id:
                                bioblend_params[step_id] = value
                bioblend_inputs = {}
                bioblend_params = {}

            self.status.state = WorkState.QUEUED
            invocation_info = self.galaxy_instance.workflows.invoke_workflow(
@@ -294,10 +255,8 @@ class Invocation:
        """Returns the Galaxy invocation ID."""
        return self.invocation_id

    def get_step_jobs(self) -> List["Job"]:
    def get_step_jobs(self) -> List[Tool]:
        """Returns nova-galaxy Job instances for each step in the workflow invocation."""
        from .job import Job

        if not self.invocation_id:
            return []

@@ -305,17 +264,13 @@ class Invocation:
            jobs_summary = self.galaxy_instance.invocations.get_invocation_step_jobs_summary(self.invocation_id)
            step_jobs = []

            for job_info in jobs_summary:
                if job_info.get("id") and job_info.get("tool_id"):
                    # Create a Job instance for this step
                    job = Job(job_info["tool_id"], self.store)
                    job.id = job_info["id"]

                    # Map Galaxy job state to WorkState
                    galaxy_state = job_info.get("state", "unknown")
                    job.status.state = self._map_galaxy_state_to_workstate(galaxy_state)
            tools = self.store.recover_tools(filter_running=True)

                    step_jobs.append(job)
            for job_info in jobs_summary:
                if job_info.get("id"):
                    for tool in tools:
                        if job_info.get("id") == tool.get_uid():
                            step_jobs.append(tool)

            return step_jobs
        except Exception as e:
@@ -323,7 +278,7 @@ class Invocation:
            return []


class Workflow(AbstractWork):
class Workflow(AbstractWorkflow):
    """Represents a Galaxy workflow that can be invoked (run).

    It's recommended to create a new Workflow object for each invocation
@@ -341,7 +296,9 @@ class Workflow(AbstractWork):
        super().__init__(id)
        self._invocation: Optional[Invocation] = None

    def run(self, data_store: "Datastore", params: Optional[Parameters] = None, wait: bool = True) -> Optional[Outputs]:
    def run(
        self, data_store: "Datastore", params: Optional[WorkflowParameters] = None, wait: bool = True
    ) -> Optional[Outputs]:
        """Invokes (runs) this workflow in the specified data store.

        By default, runs in a blocking manner (waits for completion). Set `wait=False`
@@ -455,7 +412,7 @@ class Workflow(AbstractWork):
            return self._invocation.get_invocation_id()
        return None

    def get_step_jobs(self) -> List["Job"]:
    def get_step_jobs(self) -> List[Tool]:
        """Gets nova-galaxy Job instances for each step in the workflow.

        Returns the individual jobs that make up the workflow steps,
@@ -482,7 +439,7 @@ class Workflow(AbstractWork):
            return self._invocation.get_step_jobs()
        return []

    def get_active_step(self) -> Optional["Job"]:
    def get_active_step(self) -> Optional[Tool]:
        """Gets the currently active (running) step in the workflow invocation.

        This method iterates through all jobs associated with the workflow steps
@@ -500,6 +457,6 @@ class Workflow(AbstractWork):

        step_jobs = self._invocation.get_step_jobs()
        for job in step_jobs:
            if job.status.state == WorkState.RUNNING:
            if job.get_status().state == WorkState.RUNNING:
                return job
        return None
+2 −2
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ import os

from nova.common.job import WorkState
from nova.galaxy.connection import Connection
from nova.galaxy.parameters import Parameters
from nova.galaxy.parameters import WorkflowParameters
from nova.galaxy.workflow import Workflow

GALAXY_URL = os.environ.get("NOVA_GALAXY_TEST_GALAXY_URL", "https://calvera-test.ornl.gov")
@@ -25,7 +25,7 @@ def test_workflow_lifecycle_with_placeholder_id(nova_instance: Connection) -> No
        ds = connection.get_data_store(name=TEST_HISTORY_NAME_WF)
        workflows = connection.galaxy_instance.workflows.get_workflows(name=WORKFLOW_NAME, published=True)
        workflow_id = workflows[0]["id"]
        params = Parameters()
        params = WorkflowParameters()

        workflow = Workflow(id=workflow_id)