Commit 555da92d authored by Duggan, John's avatar Duggan, John
Browse files

Implement ONCat backend for NeutronDataSelector

parent b0918eef
Loading
Loading
Loading
Loading
Loading
+162 −0
Original line number Diff line number Diff line
"""Analysis cluster filesystem backend for NeutronDataSelector."""

import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from warnings import warn

from natsort import natsorted
from pydantic import Field, model_validator
from typing_extensions import Self

from .neutron_data_selector import NeutronDataSelectorModel, NeutronDataSelectorState

CUSTOM_DIRECTORIES_LABEL = "Custom Directory"

INSTRUMENTS = {
    "HFIR": {
        "CG-1A": "CG1A",
        "DEV BEAM": "CG1B",
        "MARS": "CG1D",
        "GP-SANS": "CG2",
        "BIO-SANS": "CG3",
        "CNPDB": "CG4B",
        "CTAX": "CG4C",
        "IMAGINE": "CG4D",
        "PTAX": "HB1",
        "VERITAS": "HB1A",
        "POWDER": "HB2A",
        "HIDRA": "HB2B",
        "WAND²": "HB2C",
        "TAX": "HB3",
        "DEMAND": "HB3A",
        "NOWG": "NOWG",
        "NOWV": "NOWV",
    },
    "SNS": {
        "ARCS": "ARCS",
        "BL-0": "BL0",
        "BASIS": "BSS",
        "CNCS": "CNCS",
        "CORELLI": "CORELLI",
        "EQ-SANS": "EQSANS",
        "HYSPEC": "HYS",
        "MANDI": "MANDI",
        "NOMAD": "NOM",
        "NOWB": "NOWB",
        "NOWD": "NOWD",
        "NSE": "NSE",
        "POWGEN": "PG3",
        "LIQREF": "REF_L",
        "MAGREF": "REF_M",
        "SEQUOIA": "SEQ",
        "SNAP": "SNAP",
        "TOPAZ": "TOPAZ",
        "USANS": "USANS",
        "VENUS": "VENUS",
        "VISION": "VIS",
        "VULCAN": "VULCAN",
    },
}


class AnalysisDataSelectorState(NeutronDataSelectorState):
    """Selection state for identifying datafiles."""

    allow_custom_directories: bool = Field(default=False)
    custom_directory: str = Field(default="", title="Custom Directory")

    @model_validator(mode="after")
    def validate_state(self) -> Self:
        valid_facilities = self.get_facilities()
        if self.facility and self.facility not in valid_facilities:
            warn(f"Facility '{self.facility}' could not be found. Valid options: {valid_facilities}", stacklevel=1)

        valid_instruments = self.get_instruments()
        if self.instrument and self.facility != CUSTOM_DIRECTORIES_LABEL and self.instrument not in valid_instruments:
            warn(
                (
                    f"Instrument '{self.instrument}' could not be found in '{self.facility}'. "
                    f"Valid options: {valid_instruments}"
                ),
                stacklevel=1,
            )
        # Validating the experiment is expensive and will fail in our CI due to the filesystem not being mounted there.

        return self

    def get_facilities(self) -> List[str]:
        facilities = list(INSTRUMENTS.keys())
        if self.allow_custom_directories:
            facilities.append(CUSTOM_DIRECTORIES_LABEL)
        return facilities

    def get_instruments(self) -> List[str]:
        return list(INSTRUMENTS.get(self.facility, {}).keys())


class AnalysisDataSelectorModel(NeutronDataSelectorModel):
    """Analysis cluster filesystem backend for NeutronDataSelector."""

    def __init__(self, state: AnalysisDataSelectorState) -> None:
        super().__init__(state)
        self.state: AnalysisDataSelectorState = state

    def set_binding_parameters(self, **kwargs: Any) -> None:
        super().set_binding_parameters(**kwargs)

        if "allow_custom_directories" in kwargs:
            self.state.allow_custom_directories = kwargs["allow_custom_directories"]

    def get_custom_directory_path(self) -> Optional[Path]:
        # Don't expose the full file system
        if not self.state.custom_directory:
            return None

        return Path(self.state.custom_directory)

    def get_experiment_directory_path(self) -> Optional[Path]:
        if not self.state.experiment:
            return None

        return Path("/") / self.state.facility / self.get_instrument_dir() / self.state.experiment

    def get_instrument_dir(self) -> str:
        return INSTRUMENTS.get(self.state.facility, {}).get(self.state.instrument, "")

    def get_experiments(self) -> List[str]:
        experiments = []

        instrument_path = Path("/") / self.state.facility / self.get_instrument_dir()
        try:
            for dirname in os.listdir(instrument_path):
                if dirname.startswith("IPTS-") and os.access(instrument_path / dirname, mode=os.R_OK):
                    experiments.append(dirname)
        except OSError:
            pass

        return natsorted(experiments)

    def get_directories(self, base_path: Optional[Path] = None) -> List[Dict[str, Any]]:
        using_custom_directory = self.state.facility == CUSTOM_DIRECTORIES_LABEL
        if base_path:
            pass
        elif using_custom_directory:
            base_path = self.get_custom_directory_path()
        else:
            base_path = self.get_experiment_directory_path()

        if not base_path:
            return []

        return self.get_directories_from_path(base_path)

    def get_datafiles(self, *args: Any, **kwargs: Any) -> List[str]:
        if self.state.experiment:
            base_path = Path("/") / self.state.facility / self.get_instrument_dir() / self.state.experiment
        elif self.state.custom_directory:
            base_path = Path(self.state.custom_directory)
        else:
            return []

        return self.get_datafiles_from_path(base_path)
+8 −128
Original line number Diff line number Diff line
"""Model implementation for DataSelector."""

import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from warnings import warn

from natsort import natsorted
from pydantic import Field, field_validator, model_validator
from typing_extensions import Self
from pydantic import Field, field_validator

from ..data_selector import DataSelectorModel, DataSelectorState

CUSTOM_DIRECTORIES_LABEL = "Custom Directory"

INSTRUMENTS = {
    "HFIR": {
        "CG-1A": "CG1A",
        "CG-1B": "CG1B",
        "CG-1D": "CG1D",
        "CG-2": "CG2",
        "CG-3": "CG3",
        "CG-4B": "CG4B",
        "CG-4C": "CG4C",
        "CG-4D": "CG4D",
        "HB-1": "HB1",
        "HB-1A": "HB1A",
        "HB-2A": "HB2A",
        "HB-2B": "HB2B",
        "HB-2C": "HB2C",
        "HB-3": "HB3",
        "HB-3A": "HB3A",
        "NOW-G": "NOWG",
        "NOW-V": "NOWV",
    },
    "SNS": {
        "BL-18": "ARCS",
        "BL-0": "BL0",
        "BL-2": "BSS",
        "BL-5": "CNCS",
        "BL-9": "CORELLI",
        "BL-6": "EQSANS",
        "BL-14B": "HYS",
        "BL-11B": "MANDI",
        "BL-1B": "NOM",
        "NOW-G": "NOWG",
        "BL-15": "NSE",
        "BL-11A": "PG3",
        "BL-4B": "REF_L",
        "BL-4A": "REF_M",
        "BL-17": "SEQ",
        "BL-3": "SNAP",
        "BL-12": "TOPAZ",
        "BL-1A": "USANS",
        "BL-10": "VENUS",
        "BL-16B": "VIS",
        "BL-7": "VULCAN",
    },
}


class NeutronDataSelectorState(DataSelectorState):
    """Selection state for identifying datafiles."""

    allow_custom_directories: bool = Field(default=False)
    facility: str = Field(default="", title="Facility")
    instrument: str = Field(default="", title="Instrument")
    experiment: str = Field(default="", title="Experiment")
    custom_directory: str = Field(default="", title="Custom Directory")

    @field_validator("experiment", mode="after")
    @classmethod
@@ -75,33 +23,11 @@ class NeutronDataSelectorState(DataSelectorState):
            raise ValueError("experiment must begin with IPTS-")
        return experiment

    @model_validator(mode="after")
    def validate_state(self) -> Self:
        valid_facilities = self.get_facilities()
        if self.facility and self.facility not in valid_facilities:
            warn(f"Facility '{self.facility}' could not be found. Valid options: {valid_facilities}", stacklevel=1)

        valid_instruments = self.get_instruments()
        if self.instrument and self.facility != CUSTOM_DIRECTORIES_LABEL and self.instrument not in valid_instruments:
            warn(
                (
                    f"Instrument '{self.instrument}' could not be found in '{self.facility}'. "
                    f"Valid options: {valid_instruments}"
                ),
                stacklevel=1,
            )
        # Validating the experiment is expensive and will fail in our CI due to the filesystem not being mounted there.

        return self

    def get_facilities(self) -> List[str]:
        facilities = list(INSTRUMENTS.keys())
        if self.allow_custom_directories:
            facilities.append(CUSTOM_DIRECTORIES_LABEL)
        return facilities
        raise NotImplementedError()

    def get_instruments(self) -> List[str]:
        return list(INSTRUMENTS.get(self.facility, {}).keys())
        raise NotImplementedError()


class NeutronDataSelectorModel(DataSelectorModel):
@@ -120,64 +46,18 @@ class NeutronDataSelectorModel(DataSelectorModel):
            self.state.instrument = kwargs["instrument"]
        if "experiment" in kwargs:
            self.state.experiment = kwargs["experiment"]
        if "allow_custom_directories" in kwargs:
            self.state.allow_custom_directories = kwargs["allow_custom_directories"]

    def get_facilities(self) -> List[str]:
        return natsorted(self.state.get_facilities())

    def get_instrument_dir(self) -> str:
        return INSTRUMENTS.get(self.state.facility, {}).get(self.state.instrument, "")

    def get_instruments(self) -> List[str]:
        return natsorted(self.state.get_instruments())

    def get_experiments(self) -> List[str]:
        experiments = []

        instrument_path = Path("/") / self.state.facility / self.get_instrument_dir()
        try:
            for dirname in os.listdir(instrument_path):
                if dirname.startswith("IPTS-") and os.access(instrument_path / dirname, mode=os.R_OK):
                    experiments.append(dirname)
        except OSError:
            pass

        return natsorted(experiments)

    def get_experiment_directory_path(self) -> Optional[Path]:
        if not self.state.experiment:
            return None

        return Path("/") / self.state.facility / self.get_instrument_dir() / self.state.experiment

    def get_custom_directory_path(self) -> Optional[Path]:
        # Don't expose the full file system
        if not self.state.custom_directory:
            return None

        return Path(self.state.custom_directory)
        raise NotImplementedError()

    def get_directories(self, base_path: Optional[Path] = None) -> List[Dict[str, Any]]:
        using_custom_directory = self.state.facility == CUSTOM_DIRECTORIES_LABEL
        if base_path:
            pass
        elif using_custom_directory:
            base_path = self.get_custom_directory_path()
        else:
            base_path = self.get_experiment_directory_path()

        if not base_path:
            return []

        return self.get_directories_from_path(base_path)

    def get_datafiles(self) -> List[str]:
        if self.state.experiment:
            base_path = Path("/") / self.state.facility / self.get_instrument_dir() / self.state.experiment
        elif self.state.custom_directory:
            base_path = Path(self.state.custom_directory)
        else:
            return []

        return self.get_datafiles_from_path(base_path)
        raise NotImplementedError()

    def get_datafiles(self, *args: Any, **kwargs: Any) -> List[str]:
        raise NotImplementedError()
+131 −0
Original line number Diff line number Diff line
"""ONCat backend for NeutronDataSelector."""

import os
from pathlib import Path
from typing import Any, Dict, List, Optional

from natsort import natsorted
from pydantic import Field
from pyoncat import CLIENT_CREDENTIALS_FLOW, ONCat

from .neutron_data_selector import NeutronDataSelectorModel, NeutronDataSelectorState

TOKEN_VARNAME = "USER_OIDC_TOKEN"
ID_VARNAME = "ONCAT_CLIENT_ID"
SECRET_VARNAME = "ONCAT_CLIENT_SECRET"


class ONCatDataSelectorState(NeutronDataSelectorState):
    """Selection state for identifying datafiles."""

    instrument_mapping: Dict[str, str] = Field(default={})
    projection: List[str] = Field(default=[])


class ONCatDataSelectorModel(NeutronDataSelectorModel):
    """ONCat backend for NeutronDataSelector."""

    def __init__(self, state: ONCatDataSelectorState) -> None:
        super().__init__(state)
        self.state: ONCatDataSelectorState = state

        user_token = os.environ.get(TOKEN_VARNAME, "")
        client_id = os.environ.get(ID_VARNAME, "")
        client_secret = os.environ.get(SECRET_VARNAME, "")
        if user_token:
            self.oncat_client = ONCat(url="https://calvera-test.ornl.gov/oncat", api_token=user_token)
        elif client_id and client_secret:
            self.oncat_client = ONCat(
                url="https://oncat.ornl.gov",
                client_id=client_id,
                client_secret=client_secret,
                flow=CLIENT_CREDENTIALS_FLOW,
            )
        else:
            raise EnvironmentError(
                f"In order to use the ONCat backend for NeutronDataSelector, you must set either {TOKEN_VARNAME} or "
                f"both {ID_VARNAME} and {SECRET_VARNAME} in your environment."
            )

    def set_binding_parameters(self, **kwargs: Any) -> None:
        super().set_binding_parameters(**kwargs)

        if "projection" in kwargs:
            self.state.projection = kwargs["projection"]

    def get_facilities(self) -> List[str]:
        facilities = []
        for facility_data in self.oncat_client.Facility.list(projection=["name"]):
            facilities.append(facility_data.name)
        return natsorted(facilities)

    def get_instruments(self) -> List[str]:
        if not self.state.facility:
            return []

        self.state.instrument_mapping = {}
        instruments = []
        for instrument_data in self.oncat_client.Instrument.list(
            facility=self.state.facility, projection=["short_name"]
        ):
            self.state.instrument_mapping[instrument_data.short_name] = instrument_data.id
            instruments.append(instrument_data.short_name)
        return natsorted(instruments)

    def get_experiments(self) -> List[str]:
        if not self.state.facility or not self.state.instrument:
            return []

        experiments = []
        for experiment_data in self.oncat_client.Experiment.list(
            facility=self.state.facility,
            instrument=self.state.instrument_mapping[self.state.instrument],
            projection=["name"],
        ):
            experiments.append(experiment_data.name)
        return natsorted(experiments)

    def get_directories(self, _: Optional[Path] = None) -> List[Dict[str, Any]]:
        return []

    def create_datafile_obj(self, data: Dict[str, Any], projection: List[str]) -> Dict[str, str]:
        new_obj = {"path": data["location"]}

        for key in projection:
            value: Any = data

            if key == "location":
                continue

            for part in key.split("."):
                try:
                    value = value[part]
                except KeyError:
                    value = ""
                    break

            new_obj[key] = value

        return new_obj

    def get_datafiles(self, *args: Any, **kwargs: Any) -> List[Any]:
        if not self.state.facility or not self.state.instrument or not self.state.experiment:
            return []

        projection = ["location"] + self.state.projection

        datafiles = []
        for datafile_data in self.oncat_client.Datafile.list(
            facility=self.state.facility,
            instrument=self.state.instrument_mapping[self.state.instrument],
            experiment=self.state.experiment,
            projection=projection,
        ):
            path = datafile_data.location
            if self.state.extensions:
                for extension in self.state.extensions:
                    if path.lower().endswith(extension):
                        datafiles.append(self.create_datafile_obj(datafile_data, projection))
            else:
                datafiles.append(self.create_datafile_obj(datafile_data, projection))
        return natsorted(datafiles, key=lambda d: d["path"])
+10 −5
Original line number Diff line number Diff line
@@ -155,9 +155,9 @@ class DataSelector(datagrid.VGrid):
                        )
                        vuetify.VListItem("No directories found", classes="flex-0-1 text-center", v_else=True)

                super().__init__(
                    v_model=self._v_model,
                    can_focus=False,
                if "columns" in kwargs:
                    columns = kwargs.pop("columns")
                else:
                    columns = (
                        "[{"
                        "    cellTemplate: (createElement, props) =>"
@@ -167,7 +167,12 @@ class DataSelector(datagrid.VGrid):
                        "    name: 'Available Datafiles',"
                        "    prop: 'title',"
                        "}]",
                    ),
                    )

                super().__init__(
                    v_model=self._v_model,
                    can_focus=False,
                    columns=columns,
                    column_span=1 if isinstance(self._subdirectory, tuple) or not self._subdirectory else 2,
                    frame_size=10,
                    hide_attribution=True,
+72 −12

File changed.

Preview size limit exceeded, changes collapsed.

Loading