Commit 13e43bdd authored by Duggan, John's avatar Duggan, John
Browse files

Functional superfacility runs (outputs still need to be fetched from Perlmutter)

parent 21c4c63e
Loading
Loading
Loading
Loading
+42 −76
Original line number Diff line number Diff line
"""IPS Fastran classes."""

import logging
import os
import zipfile
from typing import List, Tuple
from typing import Any

from nova.galaxy import Connection, Dataset, Parameters, Tool
from nova.galaxy.interfaces import BasicTool

from ips_fastran_gui.app.models.local import LocalTool
from ips_fastran_gui.app.models.main_model import MainModel, RunLocationOption
from ips_fastran_gui.app.models.superfacility import SuperfacilityTool
from ips_fastran_gui.app.models.tool_implementations.local import LocalTool
from ips_fastran_gui.app.models.tool_implementations.superfacility import SuperfacilityTool

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class IPSFastranTool(BasicTool):
class IPSFastranTool:
    """Class that prepares IPS Fastran tool."""

    def __init__(self, model: MainModel) -> None:
        super().__init__()
        self.model = model
        self.tool: Any = None

        # self.galaxy_url = galaxy_url
        # self.galaxy_api_key = galaxy_api_key

    def prepare_tool(self) -> Tuple[Tool, Parameters]:
    def prepare_tool(self) -> None:
        match self.model.resource_params.run_location:
            case RunLocationOption.local:
                return self.prepare_local()
                self.prepare_local()
            case RunLocationOption.sf_perlmutter:
                return self.prepare_superfacility()
                self.prepare_superfacility()
            case RunLocationOption.galaxy_perlmutter:
                return self.prepare_galaxy()

        return None, None

    def prepare_galaxy(self) -> Tuple[Tool, Parameters]:
        connection = Connection(self.galaxy_url, self.galaxy_api_key).connect()
        self.store = connection.get_data_store("ips_fastran")

        # Prepare file ingestion into Galaxy
        self.inputs_dataset = Dataset(name="inputs.zip")
        with zipfile.ZipFile("input.zip", "w") as zip_obj:
            for file in self.model.config.input_files:
                zip_obj.writestr(file["name"], file["content"].encode("latin1"))
        self.inputs_dataset.path = "input.zip"
        self.config_dataset = Dataset(name="config.txt")
        # self.config_dataset.set_content(self.model.config.file_contents)

        # Create the tool instance
        tool_params = Parameters()
        tool_params.add_input(name="inputs", value=self.inputs_dataset)
        tool_params.add_input(name="config", value=self.config_dataset)
        tool_params.add_input(name="shot_number", value=self.model.config.shot_number)
        tool_params.add_input(name="time_id", value=self.model.config.time_id)
        tool_params.add_input(name="__job_resource|__job_resource__select", value="yes")
        tool_params.add_input(name="__job_resource|nodes", value=self.model.resource_params.number_of_nodes)
        tool_params.add_input(name="__job_resource|ntasks_per_node", value=self.model.resource_params.tasks_per_node)
        tool_params.add_input(name="__job_resource|qos", value=self.model.resource_params.partition)
        tool_params.add_input(name="__job_resource|time", value=self.model.resource_params.time_limit)
        tool_params.add_input(name="__job_resource|remote_resource_cloud_nersc", value="nersc")
        self.tool = Tool(id="fusion-ips-fastran")

        return self.tool, tool_params

    def prepare_local(self) -> Tuple[Tool, Parameters]:
                self.prepare_galaxy()

    def prepare_galaxy(self) -> None:
        pass
        # connection = Connection(self.galaxy_url, self.galaxy_api_key).connect()
        # self.store = connection.get_data_store("ips_fastran")

        # # Prepare file ingestion into Galaxy
        # self.inputs_dataset = Dataset(name="inputs.zip")
        # with zipfile.ZipFile("input.zip", "w") as zip_obj:
        #     for file in self.model.config.input_files:
        #         zip_obj.writestr(file["name"], file["content"].encode("latin1"))
        # self.inputs_dataset.path = "input.zip"
        # self.config_dataset = Dataset(name="config.txt")
        # # self.config_dataset.set_content(self.model.config.file_contents)

        # # Create the tool instance
        # tool_params = Parameters()
        # tool_params.add_input(name="inputs", value=self.inputs_dataset)
        # tool_params.add_input(name="config", value=self.config_dataset)
        # tool_params.add_input(name="shot_number", value=self.model.config.shot_number)
        # tool_params.add_input(name="time_id", value=self.model.config.time_id)
        # tool_params.add_input(name="__job_resource|__job_resource__select", value="yes")
        # tool_params.add_input(name="__job_resource|nodes", value=self.model.resource_params.number_of_nodes)
        # tool_params.add_input(name="__job_resource|ntasks_per_node", value=self.model.resource_params.tasks_per_node)
        # tool_params.add_input(name="__job_resource|qos", value=self.model.resource_params.partition)
        # tool_params.add_input(name="__job_resource|time", value=self.model.resource_params.time_limit)
        # tool_params.add_input(name="__job_resource|remote_resource_cloud_nersc", value="nersc")
        # self.tool = Tool(id="fusion-ips-fastran")

        # return self.tool, tool_params

    def prepare_local(self) -> None:
        self.tool = LocalTool(self.model)

        return self.tool, Parameters()

    def prepare_superfacility(self) -> Tuple[Tool, Parameters]:
    def prepare_superfacility(self) -> None:
        self.tool = SuperfacilityTool(self.model)

        return self.tool, Parameters()

    def get_output_paths(self) -> List[str]:
        outputs = self.tool.get_results()
        collection = outputs.get_collection("outputs")
        collection.download("data.zip")

        with zipfile.ZipFile("data.zip", "r") as zip_obj:
            zip_obj.extractall()

        paths = []
        for path in os.listdir("IPS Fastran output"):
            if path.startswith("f"):
                paths.append(path)

        return paths

    def get_results(self, tool: Tool) -> bytes:
        if not os.path.exists("data.zip"):
            return bytes()

        with open("data.zip", "rb") as zip_obj:
            return zip_obj.read()
+1 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@ class Execution(BaseModel):
    """Holds the job execution status."""

    has_run: bool = Field(default=False)
    is_queued: bool = Field(default=False)
    is_running: bool = Field(default=False)
    stdout: str = Field(default="")
    stderr: str = Field(default="")
+112 −58
Original line number Diff line number Diff line
"""Tool class for running via the Superfacility API."""

import json
import os
from time import sleep, time
from typing import Any, Dict
from typing import Any, Dict, List, Optional

from authlib.integrations.requests_client import OAuth2Session
from authlib.oauth2.rfc7523 import PrivateKeyJWT
from nova.galaxy.job import JobStatus, WorkState

from ips_fastran_gui.app.models.main_model import MainModel

@@ -31,15 +31,35 @@ class SuperfacilityTool:
            grant_type="client_credentials",
            token_endpoint=self.model.resource_params.token_url,
        )
        self.state = JobStatus()
        self.job_id = ""
        self.task_id = ""

        self.stdout = ""
        self.stderr = ""
        self.last_status_check = 0
        self.last_stdout_check = 0
        self.last_stderr_check = 0
        self.working_directory: Optional[str] = None

    def _get_user_directory(self) -> None:
        self._refresh_token()
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/utilities/command/perlmutter", data={"executable": "pwd"}
        )
        task = response.json()
        task_id = task["task_id"]

        complete = False
        data = {}
        while not complete:
            self._refresh_token()
            response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{task_id}")
            data = response.json()

            complete = data["status"] == "completed"
            if not complete:
                sleep(0.1)

        result = json.loads(data["result"])
        if result["status"] == "ok":
            self.working_directory = result["output"].strip()
        else:
            self.working_directory = None

    def _refresh_token(self) -> None:
        expiration = self.access_token_obj.get("expires_at", 0) - REFRESH_BUFFER
@@ -51,27 +71,17 @@ class SuperfacilityTool:
        if not self.job_id:
            return

        self.state.state = WorkState.CANCELING

        self._refresh_token()
        self.session.delete(f"https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter/{self.job_id}")

        self.state.state = WorkState.CANCELED

    def get_full_status(self) -> JobStatus:
        current_time = int(time())
        if not self.task_id or current_time < self.last_status_check + STATUS_INTERVAL:
            return self.state
        self.last_status_check = current_time
        self.model.execution.is_running = False

        if self.state.state == WorkState.CANCELING:
            pass
        elif not self.job_id:
            self.get_submission_status()
    def copy_input_files(self, input_files: List[Dict[str, Any]]) -> None:
        for file in input_files:
            if "children" in file:
                self.copy_input_files(file["children"])
            else:
            self.get_job_status()

        return self.state
                self.upload_file(file)

    def get_job_status(self) -> None:
        self._refresh_token()
@@ -80,39 +90,19 @@ class SuperfacilityTool:
        output = data.get("output", [])

        if not output:
            self.state.state = WorkState.FINISHED
            self.model.execution.is_queued = False
            self.model.execution.is_running = False
            self.model.execution.success = True

            # TODO: self.get_results()

            return

        if output[0]["state"] == "PENDING":
            return
        if output[0]["state"] == "RUNNING":
            self.state.state = WorkState.RUNNING

    def get_stderr(self, *args: Any, **kwargs: Any) -> str:
        current_time = int(time())
        if self.state.state != WorkState.RUNNING or current_time < self.last_stderr_check + STATUS_INTERVAL:
            return self.stderr
        self.last_stderr_check = current_time

        new_content = self.read_file("ips.err")
        return_value = new_content.removeprefix(self.stderr)

        self.stderr = new_content

        return return_value

    def get_stdout(self, *args: Any, **kwargs: Any) -> str:
        current_time = int(time())
        if self.state.state != WorkState.RUNNING or current_time < self.last_stdout_check + STATUS_INTERVAL:
            return self.stdout
        self.last_stdout_check = current_time

        new_content = self.read_file("ips.out")
        return_value = new_content.removeprefix(self.stdout)

        self.stdout = new_content

        return return_value
        self.model.execution.is_queued = False
        self.model.execution.is_running = True

    def get_submission_status(self) -> None:
        self._refresh_token()
@@ -121,12 +111,13 @@ class SuperfacilityTool:
        data = response.json()
        match data["status"]:
            case "new":
                self.state.state = WorkState.QUEUED
                pass
            case "completed":
                result = json.loads(data["result"])
                if result["status"] == "error":
                    self.state.state = WorkState.ERROR
                    self.state.details = {"message": result["error"]}
                    self.model.execution.is_queued = False
                    self.model.execution.success = False
                    self.model.execution.stderr = result["error"]
                elif result["status"] == "ok":
                    self.job_id = result["jobid"]
                else:
@@ -158,9 +149,32 @@ class SuperfacilityTool:
            return result["output"]
        return result["error"]

    def refresh_output(self) -> None:
        if not self.task_id:
            return

        if not self.job_id:
            self.get_submission_status()
        else:
            self.get_job_status()

            if self.model.execution.is_running:
                self.model.execution.stdout = self.read_file("ips.out")
                self.model.execution.stderr = self.read_file("ips.err")

    def run(self, *args: Any, **kwargs: Any) -> None:
        hours, minutes = divmod(self.model.resource_params.time_limit, 60)

        self._get_user_directory()
        if not self.working_directory:
            self.model.execution.stderr = "Could not find user's home directory."
            self.model.execution.has_run = True
            self.model.execution.success = False

            return

        self.copy_input_files(self.model.config.input_files)

        self._refresh_token()
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter",
@@ -182,10 +196,10 @@ source activate /global/common/software/atom/perlmutter/cesol/conda/dev
export SHOT_NUMBER={self.model.config.shot_number}
export TIME_ID={self.model.config.time_id}

ips.py --simulation=fastran_scenario.config --platform=perlmutter_cpu_node.conf --log=ips.log 1>ips.out 2>ips.err
ips.py --simulation=fastran_scenario.config --platform=/global/common/software/atom/example1/perlmutter_cpu_node.conf --log=ips.log 1>ips.out 2>ips.err

conda deactivate
""",
""",  # noqa
            },
        )

@@ -194,3 +208,43 @@ conda deactivate
            raise Exception(result["error"])

        self.task_id = result["task_id"]
        self.model.execution.has_run = True
        self.model.execution.is_queued = True

    def upload_file(self, file: Dict[str, Any]) -> None:
        if not self.working_directory:
            return

        self._refresh_token()
        path = os.path.join(self.working_directory, file["relative_path"])
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/utilities/command/perlmutter",
            data={"executable": f"mkdir -p {os.path.dirname(path)}"},
        )
        task = response.json()
        task_id = task["task_id"]

        complete = False
        data = {}
        while not complete:
            self._refresh_token()
            response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{task_id}")
            data = response.json()

            complete = data["status"] == "completed"
            if not complete:
                sleep(0.1)

        with open(file["path"], "r") as file_obj:
            response = self.session.put(
                f"https://api.nersc.gov/api/v1.2/utilities/upload/perlmutter/{path}",
                files={"file": file_obj},
            )
        result = response.json()

        if result["status"] == "OK":
            if result["output"]:
                self.model.execution.stdout += result["output"] + "\n"
        else:
            if result["error"]:
                self.model.execution.stderr += result["error"] + "\n"
+32 −8
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ class ViewState(BaseModel):
    input_file_path: str = Field(default=os.getenv("HOME", os.getcwd()), title="Path to Input Files")
    editor_content: str = Field(default="")
    editor_path: str = Field(default="")
    updating: bool = Field(default=False)


class MainViewModel:
@@ -32,6 +33,7 @@ class MainViewModel:

    def __init__(self, model: MainModel, binding: BindingInterface):
        self.model = model
        self.binding = binding
        self.ips_fastran = IPSFastranTool(model)
        self.view_state = ViewState()

@@ -43,14 +45,14 @@ class MainViewModel:
        # self.model will be updated automatically on changes of connected fields in View,
        # but one also can provide a callback function if they want to react to those events
        # and/or process errors.
        self.config_bind = binding.new_bind(self.model.config, callback_after_update=self.on_change_config)
        self.execution_bind = binding.new_bind(self.model.execution)
        self.figure_bind = binding.new_bind()
        self.resource_params_bind = binding.new_bind(
        self.config_bind = self.binding.new_bind(self.model.config, callback_after_update=self.on_change_config)
        self.execution_bind = self.binding.new_bind(self.model.execution)
        self.figure_bind = self.binding.new_bind()
        self.resource_params_bind = self.binding.new_bind(
            self.model.resource_params, callback_after_update=self.on_change_resource_params
        )
        self.plot_json_bind = binding.new_bind()
        self.view_state_bind = binding.new_bind(self.view_state, callback_after_update=self.on_change_view_state)
        self.plot_json_bind = self.binding.new_bind()
        self.view_state_bind = self.binding.new_bind(self.view_state, callback_after_update=self.on_change_view_state)

        # Signals to process events from other view models
        self.execution_signal = signal(get_signal_id("ips_fastran", Signal.TOOL_COMMAND))
@@ -141,20 +143,42 @@ class MainViewModel:

        self.view_state_bind.update_in_view(self.view_state)

    def refresh_output(self) -> None:
    def _refresh_output(self, *args: Any, **kwargs: Any) -> None:
        self.ips_fastran.tool.refresh_output()

    def _on_refresh_complete(self, *args: Any, **kwargs: Any) -> None:
        self.config_bind.update_in_view(self.model.config)
        self.execution_bind.update_in_view(self.model.execution)

        self.view_state.updating = False
        self.view_state_bind.update_in_view(self.view_state)

    def run(self) -> None:
    def refresh_output(self) -> None:
        self.view_state.updating = True
        self.view_state_bind.update_in_view(self.view_state)

        worker = self.binding.new_worker(self._refresh_output)
        worker.connect_finished(self._on_refresh_complete)
        worker.start()

    def _run(self, *args: Any, **kwargs: Any) -> None:
        self.ips_fastran.prepare_tool()
        self.ips_fastran.tool.run()

    def _on_run_submitted(self, *args: Any, **kwargs: Any) -> None:
        self.execution_bind.update_in_view(self.model.execution)

        self.view_state.updating = False
        self.view_state_bind.update_in_view(self.view_state)

    def run(self) -> None:
        self.view_state.updating = True
        self.view_state_bind.update_in_view(self.view_state)

        worker = self.binding.new_worker(self._run)
        worker.connect_finished(self._on_run_submitted)
        worker.start()

    def update_figure(self) -> None:
        plotter = PlotFastran(self.figure, self.model.plot_json.input_params)
        plotter.load_fastran(Path(self.ips_fastran.tool.working_directory) / self.model.config.result_file)
Loading