Commit 9b7100d8 authored by Duggan, John's avatar Duggan, John
Browse files

Functional ssh filesystem connection to dtn

parent ea979fe9
Loading
Loading
Loading
Loading
Loading
+127 −132

File changed.

Preview size limit exceeded, changes collapsed.

+1 −0
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ sfapi-client = ">=0.4.1,<0.5"
setuptools = "<82.0.0"
fsspec = ">=2026.3.0"
paramiko = "*"
sshfs = "*"

[tool.pixi.feature.dev.pypi-dependencies]
mypy = ">=1.10.0"
+0 −21
Original line number Diff line number Diff line
@@ -5,7 +5,6 @@ import os
import sys
import zipfile
from enum import Enum
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List

@@ -43,7 +42,6 @@ class Config(BaseModel):
    input_file_method: InputFileOption = Field(
        default=InputFileOption.advanced, title="How Will You Provide Input Files?"
    )
    input_files: List[Dict[str, Any]] = Field(default=[], title="Input Files")
    is_running: bool = Field(default=False)
    shot_number: str = Field(default="000001", title="Shot Number")
    time_id: str = Field(default="00001", title="Time ID")
@@ -55,10 +53,6 @@ class Config(BaseModel):

    # SSH Proxy filesystem parameters
    username: str = Field(default="", title="NERSC Username")
    sshproxy_key: str = Field(
        default="",
        title="NERSC sshproxy Key",
    )


class Execution(BaseModel):
@@ -119,14 +113,6 @@ class MainModel:
            else:
                zip_obj.writestr(file["relative_path"], file["content"])

    def download_files(self) -> bytes:
        archive = BytesIO()
        with zipfile.ZipFile(archive, "w") as zip_obj:
            self._download_files(zip_obj, self.config.input_files)
        archive.seek(0)

        return archive.read()

    def get_file_from_path(self, path: str) -> Dict[str, Any]:
        parts = path.split("/")
        file = self.file_tree
@@ -165,11 +151,8 @@ class MainModel:
                except Exception:
                    pass

        self.config.input_files = self.set_files_from_tree(self.file_tree)

    def set_files(self, file_tree: Dict[str, Any]) -> None:
        self.file_tree = file_tree
        self.config.input_files = self.set_files_from_tree(self.file_tree)

    def set_file_contents(self, path: str, relative_path: str, json_data: str) -> None:
        parts = relative_path.split("/") if relative_path != "." else []
@@ -182,8 +165,6 @@ class MainModel:
        with open(path, "w") as file_obj:
            file_obj.write(json_data)

        self.config.input_files = self.set_files_from_tree(self.file_tree)

    def set_file_path(self, old_path: str, new_path: str) -> None:
        current_level = self.file_tree
        parts = old_path.split("/")
@@ -203,8 +184,6 @@ class MainModel:
        file["path"] += file["name"]
        current_level[file["name"]] = file

        self.config.input_files = self.set_files_from_tree(self.file_tree)

    def set_files_from_tree(self, tree: Dict[str, Any]) -> List[Dict[str, Any]]:
        # Converts the file tree to a structure usable by our treeview.
        file_list: List[Dict[str, Any]] = []
+0 −2
Original line number Diff line number Diff line
@@ -75,8 +75,6 @@ class LocalTool:
        os.makedirs(self.working_directory, exist_ok=True)
        self.output_directory = os.path.join(self.working_directory, "output")

        self.copy_input_files(self.model.config.input_files)

        self.process = subprocess.Popen(
            [self.model.resource_params.python_path, self.model.resource_params.executable],
            cwd=self.working_directory,
+22 −48
Original line number Diff line number Diff line
"""Tool class for running via the Superfacility API."""

import os
from io import StringIO
from shutil import rmtree
from typing import Any, Dict, List
from typing import Any, Dict

from authlib.jose import JsonWebKey
from fsspec import filesystem
from paramiko import RSAKey
from sfapi_client import Client
from sfapi_client.compute import Machine
from sfapi_client.jobs import JobState
@@ -46,12 +44,12 @@ class SuperfacilityTool:
        self.model.execution.is_queued = False
        self.model.execution.is_running = False

    def copy_input_files(self, input_files: List[Dict[str, Any]]) -> None:
        for file in input_files:
            if "children" in file:
                self.copy_input_files(file["children"])
            else:
                self.upload_file(file)
    def copy_input_files(self, local_directory: str) -> None:
        for root, _, files in os.walk(local_directory):
            for file in files:
                path = os.path.join(root, file)
                relative_path = path.replace(local_directory, "")
                self.upload_file(path, relative_path)

    def get_job_status(self) -> None:
        if not self.job:
@@ -89,39 +87,12 @@ class SuperfacilityTool:

            self.model.config.result_files.append(name)

    def read_files(self) -> Dict[str, Any]:
        self.perlmutter_fs = filesystem(
            "ssh",
            host="dtn.nersc.gov",
            pkey=RSAKey.from_private_key(StringIO(self.model.config.sshproxy_key)),
            username=self.model.config.username,
        )

        self.file_tree = {}
        for root, dirs, files in self.perlmutter_fs.walk(self.working_directory):
            current_level = self.file_tree
            relative_path = root.replace(self.working_directory, "").strip("/")
            if relative_path:
                parts = relative_path.split("/")
            else:
                parts = []
            for part in parts:
                if part not in current_level:
                    current_level[part] = {}
                current_level = current_level[part]

            for file in files:
                path = os.path.join(root, file)
                with self.perlmutter_fs.open(path, "r") as file_obj:
                    current_level[file] = {
                        "content": file_obj.read(),
                        "name": file,
                        "path": path,
                        "relative_path": os.path.join(relative_path, file),
                    }
            dirs[:] = [dir for dir in dirs if dir in ["input"]]

        return self.file_tree
    def read_files(self, local_directory: str) -> None:
        self.perlmutter_fs = filesystem("ssh", host="dtn.nersc.gov")
        self.perlmutter_fs.makedirs("ips-fastran-gui", exist_ok=True)
        for remote_file in self.perlmutter_fs.ls("ips-fastran-gui") + self.perlmutter_fs.ls("ips-fastran-gui/input"):
            local_path = remote_file.replace("ips-fastran-gui", local_directory)
            self.perlmutter_fs.get(remote_file, local_path)

    def read_file(self, path: str) -> str:
        with self.perlmutter_fs.open(os.path.join(self.working_directory, path), "r") as file_obj:
@@ -141,7 +112,7 @@ class SuperfacilityTool:
        self.perlmutter = self.client.compute(Machine.perlmutter)
        hours, minutes = divmod(self.model.resource_params.time_limit, 60)

        self.copy_input_files(self.model.config.input_files)
        self.copy_input_files(args[0])
        self.job = self.perlmutter.submit_job(f"""#!/bin/bash -l
#SBATCH --chdir={self.working_directory}
#SBATCH -p {self.model.resource_params.partition}
@@ -167,10 +138,13 @@ conda deactivate
        self.model.execution.has_run = True
        self.model.execution.is_queued = True

    def upload_file(self, file: Dict[str, Any]) -> None:
        if file["name"].startswith(".") or file["name"].startswith(".log"):
    def upload_file(self, path: str, relative_path: str) -> None:
        name = os.path.basename(path)

        if name.startswith(".") or name.startswith(".log"):
            return

        path = os.path.join(self.working_directory, file["relative_path"])
        with self.perlmutter_fs.open(path, "w") as file_obj:
            file_obj.write(file["content"])
        with open(path, "r") as local_file:
            remote_path = os.path.join(self.working_directory, relative_path)
            with self.perlmutter_fs.open(remote_path, "w") as remote_file:
                remote_file.write(local_file.read())
Loading