Commit 21c4c63e authored by Duggan, John's avatar Duggan, John
Browse files

Fully functional local runs

parent f91cb167
Loading
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@
import logging
import os
import zipfile
from typing import Callable, List, Tuple
from typing import List, Tuple

from nova.galaxy import Connection, Dataset, Parameters, Tool
from nova.galaxy.interfaces import BasicTool
@@ -19,12 +19,12 @@ logger.setLevel(logging.INFO)
class IPSFastranTool(BasicTool):
    """Class that prepares IPS Fastran tool."""

    def __init__(self, model: MainModel, store_factory: Callable, galaxy_url: str, galaxy_api_key: str) -> None:
    def __init__(self, model: MainModel) -> None:
        super().__init__()
        self.model = model

        self.galaxy_url = galaxy_url
        self.galaxy_api_key = galaxy_api_key
        # self.galaxy_url = galaxy_url
        # self.galaxy_api_key = galaxy_api_key

    def prepare_tool(self) -> Tuple[Tool, Parameters]:
        match self.model.resource_params.run_location:
+21 −68
Original line number Diff line number Diff line
@@ -4,20 +4,12 @@ import os
import subprocess
from datetime import datetime as dt
from shutil import copy
from time import time
from typing import Any, Dict, List, Optional

from nova.galaxy.job import JobStatus, WorkState
from nova.galaxy.outputs import DatasetCollection, Outputs

from ips_fastran_gui.app.models.main_model import MainModel

# Pull new access token 60 seconds before expiration
REFRESH_BUFFER = 60

# Check task status every 10 seconds
STATUS_INTERVAL = 10


class LocalTool:
    """Tool class for running via a local ips.py build."""
@@ -29,12 +21,6 @@ class LocalTool:
        self.process: Optional[subprocess.Popen] = None
        self.working_directory = ""

        self.stdout = ""
        self.stderr = ""
        self.last_status_check = 0
        self.last_stdout_check = 0
        self.last_stderr_check = 0

    def cancel(self) -> None:
        if not self.process:
            return
@@ -55,66 +41,31 @@ class LocalTool:
                os.makedirs(os.path.dirname(new_path), exist_ok=True)
                copy(file["path"], new_path)

    def get_full_status(self) -> JobStatus:
        current_time = int(time())
        if current_time < self.last_status_check + STATUS_INTERVAL:
            return self.state
        self.last_status_check = current_time
    def get_results(self) -> None:
        self.model.config.result_files = []

        path = os.path.join(self.working_directory, "output")
        for dirpath, _, filenames in os.walk(path):
            for file in filenames:
                full_path = os.path.join(dirpath, file)
                relative_path = os.path.relpath(full_path, path)
                self.model.config.result_files.append(relative_path)

    def refresh_output(self) -> None:
        if not self.process:
            self.state.state = WorkState.NOT_STARTED
            return self.state
            return

        self.model.execution.stdout, self.model.execution.stderr = self.process.communicate()
        exit_code = self.process.poll()
        if exit_code is None:
            self.state.state = WorkState.RUNNING
        elif exit_code == 0:
            self.state.state = WorkState.FINISHED
            self.process = None
        else:
            self.state.state = WorkState.ERROR

        return self.state

    def get_results(self) -> Outputs:
        outputs = Outputs()
        dataset_collection = DatasetCollection("outputs")

        outputs.add_output(dataset_collection)

        return outputs

    def get_stderr(self, *args: Any, **kwargs: Any) -> str:
        current_time = int(time())
        if current_time < self.last_stderr_check + STATUS_INTERVAL:
            return ""

        if not self.process or self.state.state != WorkState.RUNNING:
            return self.stderr
        self.last_stderr_check = current_time

        _, new_content = self.process.communicate()
        return_value = new_content.removeprefix(self.stderr)

        self.stderr = new_content

        return return_value

    def get_stdout(self, *args: Any, **kwargs: Any) -> str:
        current_time = int(time())
        if current_time < self.last_stdout_check + STATUS_INTERVAL:
            return ""

        if not self.process or self.state.state != WorkState.RUNNING:
            return self.stdout
        self.last_stdout_check = current_time

        new_content, _ = self.process.communicate()
        return_value = new_content.removeprefix(self.stdout)

        self.stdout = new_content
            return

        return return_value
        if exit_code == 0:
            self.model.execution.is_running = False
            self.model.execution.success = True
            self.get_results()
        else:
            self.model.execution.success = False

    def run(self, *args: Any, **kwargs: Any) -> None:
        self.working_directory = os.path.join(
@@ -131,3 +82,5 @@ class LocalTool:
            stderr=subprocess.PIPE,
            text=True,
        )
        self.model.execution.has_run = True
        self.model.execution.is_running = True
+12 −0
Original line number Diff line number Diff line
@@ -26,12 +26,23 @@ class Config(BaseModel):
    """

    input_files: List[Dict[str, Any]] = Field(default=[], title="Input Files")
    is_running: bool = Field(default=False)
    shot_number: str = Field(default="000001", title="Shot Number")
    time_id: str = Field(default="00001", title="Time ID")
    result_file: str = Field(default="", title="File to Plot")
    result_files: List[str] = Field(default=[])


class Execution(BaseModel):
    """Holds the job execution status."""

    has_run: bool = Field(default=False)
    is_running: bool = Field(default=False)
    stdout: str = Field(default="")
    stderr: str = Field(default="")
    success: bool = Field(default=False)


class RunLocationOption(str, Enum):
    """Defines available locations for running IPS Fastran."""

@@ -77,6 +88,7 @@ class MainModel:

    def __init__(self) -> None:
        self.config = Config()
        self.execution = Execution()
        self.plot_json = PlotJSON()
        self.resource_params = ResourceParameters()

+0 −2
Original line number Diff line number Diff line
@@ -3,7 +3,6 @@
from nova.mvvm.interface import BindingInterface

from .models.main_model import MainModel
from .view_models.execution import ExecutionViewModel
from .view_models.main_view_model import MainViewModel


@@ -11,6 +10,5 @@ def create_viewmodels(binding: BindingInterface) -> dict:
    model = MainModel()
    vm: dict = {}
    vm["main"] = MainViewModel(model, binding)
    vm["execution"] = ExecutionViewModel(model, binding)

    return vm
+0 −48
Original line number Diff line number Diff line
"""Module for the Execution ViewModel."""

import argparse
import os
from typing import Any, Dict, Tuple

from blinker import signal
from nova.common.job import WorkState
from nova.galaxy.tool_runner import ToolRunner
from nova.mvvm.interface import BindingInterface

from ips_fastran_gui.app.models.ips_fastran import IPSFastranTool
from ips_fastran_gui.app.models.main_model import MainModel


def parse_args() -> Tuple[str, str]:
    parser = argparse.ArgumentParser()
    parser.add_argument("--galaxy-url", help="URL of the Galaxy server")
    parser.add_argument("--galaxy-key", help="API key for accessing the Galaxy server")
    args, unknown = parser.parse_known_args()
    galaxy_url = args.galaxy_url or os.getenv("GALAXY_URL", "")
    galaxy_api_key = args.galaxy_key or os.getenv("GALAXY_API_KEY", "")
    return str(galaxy_url), str(galaxy_api_key)


class ExecutionViewModel:
    """A viewmodel responsible for executing and monitoring of the workflow in Galaxy."""

    def __init__(self, model: MainModel, _binding: BindingInterface):
        self.model = model
        galaxy_url, galaxy_api_key = parse_args()
        self.tool = IPSFastranTool(model, self.store_factory, galaxy_url, galaxy_api_key)
        self.tool_runner = ToolRunner("ips_fastran", self.tool, self.store_factory, galaxy_url, galaxy_api_key)
        self.tool_runner.progress_signal.connect(self.on_progress)
        self.completion_signal = signal("ips_fastran_complete")

    async def on_progress(self, _sender: Any, state: WorkState, details: Dict[str, Any]) -> None:
        if state == WorkState.FINISHED:
            paths = self.tool.get_output_paths()
            self.model.set_results(paths)

            self.completion_signal.send()

    def store_factory(self) -> str:
        return "ips_fastran"

    def download_files(self) -> bytes:
        return self.model.download_files()
Loading