Commit 34ca0479 authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

use dictionary for status details

parent 08022a4e
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "nova-galaxy"
version = "0.10.3"
version = "0.11.0"
description = "Utilties for accessing the ORNL Galaxy instance"
authors = ["Greg Watson <watsongr@ornl.gov>", "Gregory Cage <cagege@ornl.gov>", "Sergey Yakubov <yakubovs@ornl.gov>"]
readme = "README.md"
+40 −8
Original line number Diff line number Diff line
"""Internal job related classes and functions."""

import time
from datetime import datetime, timezone
from threading import Lock, Thread
from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional

from bioblend import galaxy
from bioblend.galaxy.datasets import DatasetClient
@@ -18,12 +19,42 @@ from .parameters import Parameters
REGISTER_NEUTRON_DATA_TOOL = "neutrons_register"


def get_job_details(state: WorkState, job: Dict[str, Any]) -> Dict[str, Any]:
    if "job_messages" not in job or not job["job_messages"]:
        return {}
    for message in job["job_messages"]:
        if "status_details" in message:
            match message["status_details"]["status_details_source"]:
                case "slurm":
                    return get_slurm_status(state, message["status_details"])
    return {}


def to_local_time(input: str) -> str:
    utc_time = datetime.fromisoformat(input).replace(tzinfo=timezone.utc)
    local_time = utc_time.astimezone()
    return local_time.strftime("%Y-%m-%d %H:%M:%S")


def get_slurm_status(state: WorkState, status: Dict[str, Any]) -> Dict[str, Any]:
    if state == WorkState.RUNNING:
        short_details = (
            f"Slurm Job Details: Id:{status['JobId']}, Job State:{status['JobState']}, Run Time:{status['RunTime']}"
        )
    else:
        short_details = (
            f"Slurm Job Details: Id:{status['JobId']}, Job State:{status['JobState']},"
            f" Last Evaluation:{to_local_time(status['LastSchedEval'])}"
        )
    return {"message": short_details, "original_dict": status}


class JobStatus:
    """Internal structure to hold job status info."""

    def __init__(self) -> None:
        self.lock = Lock()
        self._details = ""
        self._details: Dict[str, Any] = {}
        self._state = WorkState.NOT_STARTED

    @property
@@ -37,12 +68,12 @@ class JobStatus:
            self._state = value

    @property
    def details(self) -> str:
    def details(self) -> Dict[str, Any]:
        with self.lock:
            return self._details

    @details.setter
    def details(self, value: str) -> None:
    def details(self, value: Dict[str, Any]) -> None:
        with self.lock:
            self._details = value

@@ -72,7 +103,7 @@ class Job:
                self.status.state = WorkState.CANCELED
                return
            self.status.state = WorkState.ERROR
            self.status.details = str(e)
            self.status.details = {"message": str(e)}
            return

        self.status.state = WorkState.FINISHED
@@ -225,7 +256,7 @@ class Job:
        if response:
            return True
        else:
            self.status.details = "could not stop job"
            self.status.details = {"message": "could not stop job"}
            return False

    def cancel(self) -> bool:
@@ -247,15 +278,16 @@ class Job:

    def get_state(self) -> JobStatus:
        """Returns current state of job."""
        if self.status.state == WorkState.QUEUED:
        if self.status.state == WorkState.QUEUED or self.status.state == WorkState.RUNNING:
            try:
                job = self.galaxy_instance.jobs.show_job(self.id)
                job = self.galaxy_instance.jobs.show_job(self.id, full_details=True)
                if job["state"] == "running":
                    self.status.state = WorkState.RUNNING
                elif job["state"] == "error":
                    self.status.state = WorkState.ERROR
                elif job["state"] == "deleted":
                    self.status.state = WorkState.DELETED
                self.status.details = get_job_details(self.status.state, job)
            except Exception:
                pass
        return self.status
+6 −5
Original line number Diff line number Diff line
@@ -120,14 +120,14 @@ class ToolRunner:

        if tool_state == WorkState.ERROR:
            if tool_status.details:
                self.current_outputs.stderr = f"{tool_status.details}\n{self.current_outputs.stderr}"
                self.current_outputs.stderr = f"{tool_status.details['message']}\n{self.current_outputs.stderr}"

    async def _monitor_run(self) -> None:
        while True:
            try:
                if self.nova_tool or self.error:
                    status = self._get_job_status()
                    if self.current_status.state != status.state:
                    if self.current_status.state != status.state or self.current_status.details != status.details:
                        self.current_status = status
                        await self._send_status_change_signal()
                        if job_stopped(self.current_status.state):
@@ -139,7 +139,8 @@ class ToolRunner:

    async def _send_status_change_signal(self) -> None:
        if self.current_status.state == WorkState.ERROR:
            await self.error_message_signal.send_async(self.sender_id, error_message=self.current_status.details)
            error_message = self.current_status.details.get("message", "")
            await self.error_message_signal.send_async(self.sender_id, error_message=error_message)
        await self.progress_signal.send_async(
            self.sender_id, state=self.current_status.state, details=self.current_status.details
        )
@@ -148,11 +149,11 @@ class ToolRunner:
        if self.nova_tool:
            status = copy(self.nova_tool.get_full_status())
            if status.state == WorkState.ERROR:
                status.details = "Error running NDIP tool. Please see tool outputs for more information."
                status.details = {"message": "Error running NDIP tool. Please see tool outputs for more information."}
        else:
            status = JobStatus()
            status.state = WorkState.ERROR
            status.details = self.error
            status.details = {"message": self.error}
        return status

    def _run_in_background(self) -> None: