Commit 028185dc authored by andrewfayres's avatar andrewfayres Committed by Your Name
Browse files

Fixes for workflow

parent d069a1d3
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.10.2"
version = "0.11.1"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>", "Sergey Yakubov <yakubovs@ornl.gov>"]
readme = "README.md"
+144 −27
Original line number Diff line number Diff line
"""Contains classes to run workflows in Galaxy via Connection."""

import time
from threading import Thread
from threading import Lock, Thread
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from bioblend import galaxy
@@ -9,21 +9,42 @@ from bioblend import galaxy
if TYPE_CHECKING:
    from .data_store import Datastore
    from .dataset import AbstractData
    from .job import Job

from .dataset import AbstractData, Dataset, DatasetCollection, upload_datasets
from .dataset import AbstractData, Dataset, DatasetCollection
from .outputs import Outputs
from .parameters import Parameters
from .tool import AbstractWork
from .util import WorkState
from nova.common.job import WorkState


class InvocationStatus:
    """Internal structure to hold workflow invocation status info."""

    def __init__(self) -> None:
        self.state = WorkState.NOT_STARTED
        self.details = ""
        self.error_msg = ""
        self.lock = Lock()
        self._state = WorkState.NOT_STARTED
        self._details = "" # Consolidated error_msg here

    @property
    def state(self) -> WorkState:
        with self.lock:
            return self._state

    @state.setter
    def state(self, value: WorkState) -> None:
        with self.lock:
            self._state = value

    @property
    def details(self) -> str:
        with self.lock:
            return self._details

    @details.setter
    def details(self, value: str) -> None:
        with self.lock:
            self._details = value


class Invocation:
@@ -38,19 +59,41 @@ class Invocation:
        self.outputs_data: Optional[Dict] = None

    def _map_galaxy_state_to_workstate(self, galaxy_state: str) -> WorkState:
        """Maps Galaxy invocation states to internal WorkState enum."""
        """Maps Galaxy states (both invocation and job states) to internal WorkState enum."""
        state_map = {
            # Common states
            "new": WorkState.QUEUED,
            "scheduled": WorkState.QUEUED,
            "queued": WorkState.QUEUED,
            "running": WorkState.RUNNING,
            "ok": WorkState.FINISHED,
            "error": WorkState.ERROR,
            "paused": WorkState.PAUSED,
            "canceled": WorkState.CANCELED,
            
            # Invocation-specific states
            "scheduled": WorkState.QUEUED,
            "failed": WorkState.ERROR,
            "error": WorkState.ERROR,
            "cancelled": WorkState.CANCELLED,
            
            # Job-specific states
            "upload": WorkState.UPLOADING_DATA,
            "waiting": WorkState.QUEUED,
            "deleted": WorkState.DELETED,
            "stopped": WorkState.CANCELED,
        }
        return state_map.get(galaxy_state, WorkState.ERROR)

    def _extract_error_details_from_invocation(self, invocation_details: Dict) -> str:
        """Parses invocation details to extract a detailed error message."""
        error_list = [f"Invocation failed. Overall State: {invocation_details.get('state', 'Unknown')}."]
        if 'steps' in invocation_details and isinstance(invocation_details['steps'], list):
            for step in invocation_details['steps']:
                step_galaxy_state = step.get('state')
                if self._map_galaxy_state_to_workstate(step_galaxy_state) == WorkState.ERROR:
                    step_label = step.get('workflow_step_label', f"Step ID {step.get('id', 'Unknown')}")
                    error_list.append(f"  - Step '{step_label}' failed with state: {step_galaxy_state}.")
                    # TODO: Future enhancement: Extract more specific error messages from step['messages'] or job details if available.
        return "\n".join(error_list)

    def _run_and_wait(self, params: Optional[Parameters]) -> None:
        """Submits workflow invocation and waits for completion."""
        try:
@@ -60,22 +103,21 @@ class Invocation:
                invocation_details = self.galaxy_instance.invocations.show_invocation(self.invocation_id)
                self.status.state = self._map_galaxy_state_to_workstate(invocation_details['state'])
                if self.status.state == WorkState.ERROR:
                     # TODO: Potentially parse invocation_details['steps'] for more specific error messages
                     self.status.error_msg = f"Invocation failed. State: {invocation_details['state']}"
                     self.status.details = self._extract_error_details_from_invocation(invocation_details)
                elif self.status.state == WorkState.FINISHED:
                     self.outputs_data = invocation_details
            else:
                 self.status.state = WorkState.ERROR
                 self.status.error_msg = "Workflow submission failed prior to obtaining invocation ID."
                 self.status.details = "Workflow submission failed prior to obtaining invocation ID."

        except Exception as e:
            self.status.state = WorkState.ERROR
            self.status.error_msg = f"Error during workflow execution or waiting: {str(e)}"
            self.status.details = f"Error during workflow execution or waiting: {str(e)}"


    def run(self, params: Optional[Parameters], wait: bool) -> Optional[Outputs]:
        """Runs the workflow invocation."""
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELLED]:
        if self.status.state in [WorkState.NOT_STARTED, WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELED]:
            self.status = InvocationStatus()
            self.invocation_id = None
            self.outputs_data = None
@@ -84,7 +126,7 @@ class Invocation:
            if wait:
                thread.join()
                if self.status.state == WorkState.ERROR:
                     raise Exception(f"Workflow invocation failed: {self.status.error_msg}")
                     raise Exception(f"Workflow invocation failed: {self.status.details}")
                return self.get_results()
            return None
        else:
@@ -93,8 +135,6 @@ class Invocation:

    def submit(self, params: Optional[Parameters]) -> None:
        """Handles input preparation and submits the workflow invocation."""
        self.status.state = WorkState.PREPARING_INPUTS

        bioblend_inputs = {}
        bioblend_params = {}

@@ -125,10 +165,19 @@ class Invocation:
                             raise ValueError(f"Workflow step label '{label}' not found in workflow '{self.workflow_id}' for setting parameters. Available step labels: {list(label_to_step_id.keys())}")
                        bioblend_params[step_id] = value
                    else:
                        input_id = label_to_input_id.get(label)
                        step_id = label_to_step_id.get(label)
                        if not input_id and not step_id:
                            print(f"Warning: Parameter '{label}' is not a Dataset, DatasetCollection, or a dictionary associated with a known step label. It will be ignored.")

                        else:
                            if input_id:
                                bioblend_inputs[input_id] = value
                            elif step_id:
                                bioblend_params[step_id] = value

            self.status.state = WorkState.QUEUED
            print(bioblend_inputs)
            print(bioblend_params)
            invocation_info = self.galaxy_instance.workflows.invoke_workflow(
                workflow_id=self.workflow_id,
                inputs=bioblend_inputs,
@@ -141,7 +190,7 @@ class Invocation:

        except Exception as e:
            self.status.state = WorkState.ERROR
            self.status.error_msg = f"Failed to prepare or submit workflow invocation: {str(e)}"
            self.status.details = f"Failed to prepare or submit workflow invocation: {str(e)}"
            self.invocation_id = None


@@ -155,14 +204,14 @@ class Invocation:

    def get_state(self) -> InvocationStatus:
        """Returns the current state of the workflow invocation."""
        if not self.invocation_id or self.status.state in [WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELLED]:
        if not self.invocation_id or self.status.state in [WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELED]:
            return self.status

        try:
            invocation_details = self.galaxy_instance.invocations.show_invocation(self.invocation_id)
            self.status.state = self._map_galaxy_state_to_workstate(invocation_details['state'])
            if self.status.state == WorkState.ERROR and not self.status.error_msg:
                 self.status.error_msg = f"Invocation failed. State: {invocation_details['state']}"
            if self.status.state == WorkState.ERROR and not self.status.details: # Check details
                 self.status.details = self._extract_error_details_from_invocation(invocation_details)
            if self.status.state == WorkState.FINISHED:
                 self.outputs_data = invocation_details

@@ -212,14 +261,14 @@ class Invocation:

    def cancel(self) -> bool:
        """Cancels the workflow invocation."""
        if not self.invocation_id or self.status.state in [WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELLED]:
        if not self.invocation_id or self.status.state in [WorkState.FINISHED, WorkState.ERROR, WorkState.CANCELED]:
            return False

        try:
            success = self.galaxy_instance.invocations.cancel_invocation(self.invocation_id)
            if success:
                self.status.state = WorkState.CANCELLED
                self.status.error_msg = "Invocation cancelled by user."
                self.status.state = WorkState.CANCELED
                self.status.details = "Invocation canceled by user."
            return success
        except Exception as e:
            print(f"Error cancelling invocation {self.invocation_id}: {e}")
@@ -229,6 +278,33 @@ class Invocation:
        """Returns the Galaxy invocation ID."""
        return self.invocation_id

    def get_step_jobs(self) -> List["Job"]:
        """Returns nova-galaxy Job instances for each step in the workflow invocation."""
        from .job import Job
        
        if not self.invocation_id:
            return []
        
        try:
            jobs_summary = self.galaxy_instance.invocations.get_invocation_step_jobs_summary(self.invocation_id)
            step_jobs = []
            
            for job_info in jobs_summary:
                if job_info.get('id') and job_info.get('tool_id'):
                    # Create a Job instance for this step
                    job = Job(job_info['tool_id'], self.store)
                    job.id = job_info['id']
                    
                    # Map Galaxy job state to WorkState
                    galaxy_state = job_info.get('state', 'unknown')
                    job.status.state = self._map_galaxy_state_to_workstate(galaxy_state)
                    
                    step_jobs.append(job)
            
            return step_jobs
        except Exception as e:
            print(f"Warning: Could not fetch invocation step jobs for {self.invocation_id}: {e}")
            return []

class Workflow(AbstractWork):
    """Represents a Galaxy workflow that can be invoked (run).
@@ -296,6 +372,19 @@ class Workflow(AbstractWork):
        else:
            return WorkState.NOT_STARTED

    def get_full_status(self) -> Optional[InvocationStatus]:
        """Returns the full status object of the last workflow invocation.

        Returns
        -------
        Optional[InvocationStatus]
            The InvocationStatus object containing state and details.
            Returns None if `run` has not been called yet.
        """
        if self._invocation:
            return self._invocation.get_state()
        return None

    def get_results(self) -> Optional[Outputs]:
        """Returns the results from the last completed workflow invocation.

@@ -348,3 +437,31 @@ class Workflow(AbstractWork):
        if self._invocation:
            return self._invocation.get_invocation_id()
        return None

    def get_step_jobs(self) -> List["Job"]:
        """Gets nova-galaxy Job instances for each step in the workflow.

        Returns the individual jobs that make up the workflow steps,
        allowing access to step-level status, outputs, and console logs.

        Returns
        -------
        List[Job]
            List of Job instances representing workflow steps.
            Returns empty list if workflow hasn't been run yet.

        Examples
        --------
        >>> workflow = Workflow("workflow_id")
        >>> workflow.run(data_store, params, wait=False)
        >>> jobs = workflow.get_step_jobs()
        >>> for job in jobs:
        ...     print(f"Step {job.tool}: {job.status.state}")
        ...     if job.status.state == WorkState.RUNNING:
        ...         console = job.get_console_output(0, 1000)
        ...         print(console.get('stdout', ''))
        """
        if self._invocation:
            return self._invocation.get_step_jobs()
        return []