Commit 9e99fc63 authored by Duggan, John's avatar Duggan, John
Browse files

Refactor to sfapi_client and try to use sshproxy to access dtn

parent 26d7fd4e
Loading
Loading
Loading
Loading
Loading
+419 −159

File changed.

Preview size limit exceeded, changes collapsed.

+3 −0
Original line number Diff line number Diff line
@@ -29,6 +29,9 @@ trame-code = ">=1.0.2"
nova-galaxy = ">=0.11.1"
netcdf4 = ">=1.7.2"
Authlib = "*"
sfapi-client = ">=0.4.1,<0.5"
fs-sshfs = ">=1.0.2,<2"
setuptools = "<82.0.0"

[tool.pixi.feature.dev.pypi-dependencies]
mypy = ">=1.10.0"
+3 −3
Original line number Diff line number Diff line
@@ -20,13 +20,13 @@ class IPSFastranTool:
        self.tool: Any = None

    def prepare_tool(self) -> None:
        match self.model.resource_params.run_location:
        match self.model.config.run_location:
            case RunLocationOption.local:
                self.prepare_local()
            case RunLocationOption.sf_perlmutter:
                self.prepare_superfacility()
            case RunLocationOption.galaxy_perlmutter:
                self.prepare_galaxy()
            # case RunLocationOption.galaxy_perlmutter:
            #     self.prepare_galaxy()

    def prepare_galaxy(self) -> None:
        pass
+29 −11
Original line number Diff line number Diff line
@@ -15,6 +15,21 @@ with open(Path(__file__).parent / "plot.json") as file_pointer:
    PLOT_JSON = json.load(file_pointer)


class InputFileOption(str, Enum):
    """Defines approaches for managing input files."""

    simple = "Generate Files"
    advanced = "Load Files From Path"


class RunLocationOption(str, Enum):
    """Defines available locations for running IPS Fastran."""

    local = "Local Machine"
    # galaxy_perlmutter = "Perlmutter (via Galaxy)"  # Perlmutter via the Galaxy API
    sf_perlmutter = "Perlmutter (via Superfacility API)"  # Perlmutter via the Superfacility API


class Config(BaseModel):
    """
    Contains configuration parameters.
@@ -25,12 +40,22 @@ class Config(BaseModel):
    other interfaces for improved clarity and usability.
    """

    input_file_method: InputFileOption = Field(
        default=InputFileOption.advanced, title="How Will You Provide Input Files?"
    )
    input_files: List[Dict[str, Any]] = Field(default=[], title="Input Files")
    is_running: bool = Field(default=False)
    shot_number: str = Field(default="000001", title="Shot Number")
    time_id: str = Field(default="00001", title="Time ID")
    result_file: str = Field(default="", title="File to Plot")
    result_files: List[str] = Field(default=[])
    run_location: RunLocationOption = Field(
        default=RunLocationOption.sf_perlmutter, title="Where Will You Run IPS Fastran?"
    )
    sshproxy_key: str = Field(
        default="",
        title="NERSC sshproxy Key",
    )


class Execution(BaseModel):
@@ -44,20 +69,9 @@ class Execution(BaseModel):
    success: bool = Field(default=False)


class RunLocationOption(str, Enum):
    """Defines available locations for running IPS Fastran."""

    local = "Local Machine"
    galaxy_perlmutter = "Perlmutter (via Galaxy)"  # Perlmutter via the Galaxy API
    sf_perlmutter = "Perlmutter (via Superfacility API)"  # Perlmutter via the Superfacility API


class ResourceParameters(BaseModel):
    """Contains resource parameters for running the job."""

    # General Options
    run_location: RunLocationOption = Field(default=RunLocationOption.local, title="Where Will You Run IPS Fastran?")

    # Local Run Options
    executable: str = Field(default="", title="IPS Fastran Executable (use full path)")
    python_path: str = Field(
@@ -150,6 +164,10 @@ class MainModel:

        self.config.input_files = self.set_files_from_tree(self.file_tree)

    def set_files(self, file_tree: Dict[str, Any]) -> None:
        self.file_tree = file_tree
        self.config.input_files = self.set_files_from_tree(self.file_tree)

    def set_file_contents(self, path: str, relative_path: str, json_data: str) -> None:
        parts = relative_path.split("/") if relative_path != "." else []
        current_level = self.file_tree
+92 −192
Original line number Diff line number Diff line
"""Tool class for running via the Superfacility API."""

import json
import os
from io import StringIO
from shutil import rmtree
from time import sleep, time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List

from authlib.integrations.requests_client import OAuth2Session
from authlib.oauth2.rfc7523 import PrivateKeyJWT
from authlib.jose import JsonWebKey
from fs.sshfs import SSHFS
from paramiko import RSAKey
from sfapi_client import Client
from sfapi_client.compute import Machine
from sfapi_client.jobs import JobState

from ips_fastran_gui.app.models.main_model import MainModel

@@ -24,58 +27,23 @@ class SuperfacilityTool:
    def __init__(self, model: MainModel) -> None:
        self.model = model

        self.access_token_obj: Dict[str, Any] = {}
        self.session: OAuth2Session = OAuth2Session(
            self.model.resource_params.client_id,
            self.model.resource_params.private_key,
            PrivateKeyJWT(self.model.resource_params.token_url),
            grant_type="client_credentials",
            token_endpoint=self.model.resource_params.token_url,
        self.client = Client(
            self.model.resource_params.client_id, JsonWebKey.import_key(self.model.resource_params.private_key)
        )
        self.job_id = ""
        self.task_id = ""
        self.file_tree: Dict[str, Any] = {}
        self.perlmutter = self.client.compute(Machine.perlmutter)
        self.user = self.client.user()
        self.job: Any = None

        self.working_directory: Optional[str] = None
        self.output_directory = ".results"

    def _get_user_directory(self) -> None:
        self._refresh_token()
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/utilities/command/perlmutter", data={"executable": "pwd"}
        )
        task = response.json()
        task_id = task["task_id"]

        complete = False
        data = {}
        while not complete:
            self._refresh_token()
            response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{task_id}")
            data = response.json()

            complete = data["status"] == "completed"
            if not complete:
                sleep(0.1)

        result = json.loads(data["result"])
        if result["status"] == "ok":
            self.working_directory = result["output"].strip()
        else:
            self.working_directory = None

    def _refresh_token(self) -> None:
        expiration = self.access_token_obj.get("expires_at", 0) - REFRESH_BUFFER
        current_time = int(time())
        if not self.access_token_obj or current_time > expiration:
            self.access_token_obj = self.session.fetch_token()
        self.working_directory = os.path.join("/global/u2", self.user.name[0], self.user.name)

    def cancel(self) -> None:
        if not self.job_id:
        if not self.job:
            return

        self._refresh_token()
        self.session.delete(f"https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter/{self.job_id}")

        self.job.cancel()
        self.model.execution.is_queued = False
        self.model.execution.is_running = False

    def copy_input_files(self, input_files: List[Dict[str, Any]]) -> None:
@@ -86,105 +54,59 @@ class SuperfacilityTool:
                self.upload_file(file)

    def get_job_status(self) -> None:
        self._refresh_token()
        response = self.session.get(f"https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter/{self.job_id}?cached=0")
        data = response.json()
        output = data.get("output", [])

        if not output:
            self.model.execution.is_queued = False
            self.model.execution.is_running = False
            self.model.execution.success = True

            self.get_results()

        if not self.job:
            return

        if output[0]["state"] == "PENDING":
            return
        self.job.update()

        match self.job.state:
            case JobState.RUNNING:
                self.model.execution.is_queued = False
                self.model.execution.is_running = True
            case JobState.COMPLETED:
                self.get_results()

    def get_results(self) -> None:
        if not self.working_directory:
            return
                self.model.execution.is_queued = False
                self.model.execution.is_running = False
                self.model.execution.success = True

    def get_results(self) -> None:
        self.model.config.result_files = []

        self._refresh_token()
        path = os.path.join(self.working_directory, "SUMMARY")
        response = self.session.get(f"https://api.nersc.gov/api/v1.2/utilities/ls/perlmutter/{path}")
        result = response.json()

        for entry in result.get("entries", []):
            name = entry.get("name", ".")
            if name in [".", ".."]:
                continue

            file_path = os.path.join(path, name)
            response = self.session.get(f"https://api.nersc.gov/api/v1.2/utilities/download/perlmutter/{file_path}")
            result = response.json()
        remote_paths = self.perlmutter.ls(os.path.join(self.working_directory, "SUMMARY"), directory=True)

        rmtree(self.output_directory, ignore_errors=True)
        os.makedirs(self.output_directory, exist_ok=True)
            with open(f"{self.output_directory}/{name}", "wb") as file_obj:
                file_obj.write(result["file"])
        for path in remote_paths:
            name = path.name
            contents = path.download(binary=True)

            self.model.config.result_files.append(name)

    def get_submission_status(self) -> None:
        self._refresh_token()
        response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{self.task_id}")
            with open(os.path.join(self.output_directory, name), "wb") as file_obj:
                file_obj.write(contents.read())

        data = response.json()
        match data["status"]:
            case "new":
                pass
            case "completed":
                result = json.loads(data["result"])
                if result["status"] == "error":
                    self.model.execution.is_queued = False
                    self.model.execution.success = False
                    self.model.execution.stderr = result["error"]
                elif result["status"] == "ok":
                    self.job_id = result["jobid"]
                else:
                    raise Exception("Completed task has unexpected status:", result["status"])
            case _:
                raise Exception("Unexpected submission task status:", data["status"])
            self.model.config.result_files.append(name)

    def read_file(self, path: str) -> str:
        self._refresh_token()
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/utilities/command/perlmutter", data={"executable": f"cat {path}"}
    def read_files(self) -> Dict[str, Any]:
        filesystem = SSHFS(
            "dtn.nersc.gov",
            compress=True,
            pkey=RSAKey.from_private_key(StringIO(self.model.config.sshproxy_key)),
            user=self.user.name,
        )
        task = response.json()
        task_id = task["task_id"]
        print(filesystem)

        complete = False
        data = {}
        while not complete:
            self._refresh_token()
            response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{task_id}")
            data = response.json()
        return self.file_tree

            complete = data["status"] == "completed"
            if not complete:
                sleep(0.1)
    def read_file(self, path: str) -> str:
        [remote_path] = self.perlmutter.ls(os.path.join(self.working_directory, path))
        file = remote_path.download()

        result = json.loads(data["result"])
        if result["status"] == "ok":
            return result["output"]
        return result["error"]
        return file.read().decode("utf-8")

    def refresh_output(self) -> None:
        if not self.task_id:
        if not self.job:
            return

        if not self.job_id:
            self.get_submission_status()
        else:
        self.get_job_status()

        if not self.model.execution.is_queued:
@@ -194,22 +116,9 @@ class SuperfacilityTool:
    def run(self, *args: Any, **kwargs: Any) -> None:
        hours, minutes = divmod(self.model.resource_params.time_limit, 60)

        self._get_user_directory()
        if not self.working_directory:
            self.model.execution.stderr = "Could not find user's home directory."
            self.model.execution.has_run = True
            self.model.execution.success = False

            return

        self.copy_input_files(self.model.config.input_files)

        self._refresh_token()
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/compute/jobs/perlmutter",
            data={
                "isPath": False,
                "job": f"""#!/bin/bash -l
        self.job = self.perlmutter.submit_job(f"""#!/bin/bash -l
#SBATCH --chdir={self.working_directory}
#SBATCH -p {self.model.resource_params.partition}
#SBATCH -N {self.model.resource_params.number_of_nodes}
#SBATCH -t {hours:02d}:{minutes:02d}:00
@@ -228,52 +137,43 @@ export TIME_ID={self.model.config.time_id}
ips.py --simulation=fastran_scenario.config --platform=/global/common/software/atom/example1/perlmutter_cpu_node.conf --log=ips.log 1>ips.out 2>ips.err

conda deactivate
""",  # noqa
            },
        )
""")  # noqa

        result = response.json()
        if result["status"] != "OK":
            raise Exception(result["error"])

        self.task_id = result["task_id"]
        self.model.execution.has_run = True
        self.model.execution.is_queued = True

    def upload_file(self, file: Dict[str, Any]) -> None:
        if not self.working_directory:
            return

        self._refresh_token()
        path = os.path.join(self.working_directory, file["relative_path"])
        response = self.session.post(
            "https://api.nersc.gov/api/v1.2/utilities/command/perlmutter",
            data={"executable": f"mkdir -p {os.path.dirname(path)}"},
        )
        task = response.json()
        task_id = task["task_id"]

        complete = False
        data = {}
        while not complete:
            self._refresh_token()
            response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{task_id}")
            data = response.json()

            complete = data["status"] == "completed"
            if not complete:
                sleep(0.1)

        with open(file["path"], "r") as file_obj:
            response = self.session.put(
                f"https://api.nersc.gov/api/v1.2/utilities/upload/perlmutter/{path}",
                files={"file": file_obj},
            )
        result = response.json()

        if result["status"] == "OK":
            if result["output"]:
                self.model.execution.stdout += result["output"] + "\n"
        else:
            if result["error"]:
                self.model.execution.stderr += result["error"] + "\n"
        pass
        # path = os.path.join(self.working_directory, file["relative_path"])
        # remote_path = self.perlmutter.ls()
        # response = self.session.post(
        #     "https://api.nersc.gov/api/v1.2/utilities/command/perlmutter",
        #     data={"executable": f"mkdir -p {os.path.dirname(path)}"},
        # )
        # task = response.json()
        # task_id = task["task_id"]

        # complete = False
        # data = {}
        # while not complete:
        #     self._refresh_token()
        #     response = self.session.get(f"https://api.nersc.gov/api/v1.2/tasks/{task_id}")
        #     data = response.json()

        #     complete = data["status"] == "completed"
        #     if not complete:
        #         sleep(0.1)

        # with open(file["path"], "r") as file_obj:
        #     response = self.session.put(
        #         f"https://api.nersc.gov/api/v1.2/utilities/upload/perlmutter/{path}",
        #         files={"file": file_obj},
        #     )
        # result = response.json()

        # if result["status"] == "OK":
        #     if result["output"]:
        #         self.model.execution.stdout += result["output"] + "\n"
        # else:
        #     if result["error"]:
        #         self.model.execution.stderr += result["error"] + "\n"
Loading