Loading .gitlab-ci.yml +1 −0 Original line number Diff line number Diff line Loading @@ -81,6 +81,7 @@ deploy: --set config.db.username=$PGDB_USER --set config.db.password=$PGDB_PASS --set ingress.url=api-$CI_BUILD_REF_SLUG.k8s.arm.gov --set dataService.guc.loadBalancerIP="" stop_review: stage: deploy Loading app/ADL/api/models.py +31 −2 Original line number Diff line number Diff line from datetime import datetime import databases import sqlalchemy from ormar import Integer, Model, String from ormar import Integer, Model, String, DateTime, Boolean, ForeignKey from ormar.decorators import property_field from . import config database = databases.Database(config.db_uri) dr_meta = sqlalchemy.MetaData(schema='data_reception') class ARMPath(Model): Loading Loading @@ -41,7 +44,7 @@ class Datastreams(ARMPath): class Files(ARMPath): class Meta: tablename = 'file_info' metadata = sqlalchemy.MetaData(schema='data_reception') metadata = dr_meta database = database versioned_filename: str = String(primary_key=True, max_length=64) size: int = Integer(name='size_in_bytes') Loading @@ -53,3 +56,29 @@ class Files(ARMPath): def name(self) -> str: site = self.datastream[3:] return f'/{site}/{self.datastream}/{self.versioned_filename}' # patched_field = ForeignKey(Files, related_name='contents') # patched_field.primary_key=True # class FileContents(Model): # class Meta: # tablename = 'file_contents' # metadata = dr_meta # database=database # versioned_filename: Files = patched_field # start_time: datetime = DateTime() # end_time: datetime = DateTime() # deleted: bool = Boolean() # n_samples: int = Integer() class ARMFiles(Model): class Meta: tablename = 'arm_files' metadata = dr_meta database = database versioned_filename: str = String(primary_key=True, max_length=64) size: int = Integer(name='size_in_bytes') datastream: str = String(max_length=30) md5_checksum: str = String(max_length=32, min_length=32, regex=r'[a-fA-F\d]+') start_time: datetime = DateTime() end_time: datetime = DateTime() app/ADL/api/v1.py +16 −2 Original line number Diff line number Diff line from datetime import datetime from logging import getLogger from typing import List from typing import List, Union from collections.abc import Collection import fsspec from fastapi.responses import StreamingResponse Loading @@ -8,7 +10,7 @@ from fastapi_versioning import versioned_api_route from starlette.types import Send from . import config from .models import Datastreams, Files, Sites from .models import ARMFiles, Datastreams, Files, Sites router = APIRouter(route_class=versioned_api_route(1)) log = getLogger(__name__) Loading Loading @@ -85,3 +87,15 @@ async def stream(site: Sites.site_code, datastream: Datastreams.datastream, file yield from stream return StreamingResponse(file_generator()) @router.get('/fs/file_list/') async def file_list(datastreams: Datastreams.datastream, start_time: datetime, end_time: datetime, include_deleted: bool=False): if not isinstance(datastreams, Collection): datastreams = [datastreams] return await ARMFiles.objects.filter( datastream__in=[datastreams], start_time__gte=start_time, end_time__lte=end_time, ).all() No newline at end of file app/ADL/client/__init__.py +77 −0 Original line number Diff line number Diff line import json import re from collections.abc import Collection from pprint import pprint import pandas as pd import requests import xarray as xr from . import config SITE_REGEX = r'^D?[a-z]{3}$' DATASTREAM_REGEX = r'^D?([a-z]{3})\w*([A-Z][0-9]*)\.([a-z\d]\d)$' class Archive(object): def __init__(self, username, password): self._selected_sites = None self._selected_datastreams = None self._selected_time_start = None self._selected_time_stop = None self.attributes = {} self.token = None self.__authenticate() def __authenticate(self): from secrets import token_urlsafe self.token = token_urlsafe() def __getitem__(self, key): if isinstance(key, str): if re.match(SITE_REGEX, key): self._selected_sites = set({key}) if re.match(DATASTREAM_REGEX, key): self._selected_datastreams = set({key}) elif isinstance(key, slice): if key.start: self._selected_time_start = pd.to_datetime(key.start) if key.stop: self._selected_time_stop = pd.to_datetime(key.stop) return self @property def start_time(self): return self._selected_time_start or pd.to_datetime('1970-01-01') @property def end_time(self): return self._selected_time_stop or pd.Timestamp.now() @property def file_count(self): if not self._selected_datastreams: return 29796186 if self._selected_time_start: return (self.end_time - self._selected_time_start).days @property def file_list(self): payload = { 'datastreams': self._selected_datastreams, 'start_time': self.start_time, 'end_time': self.end_time } r = requests.get(f'{str(config.endpoint)}/file_list/', params=payload) return ( f'{str(config.endpoint)}/stream/{row["datastream"][:3]}/{row["datastream"]}/{row["versioned_filename"]}' for row in requests.get(f'{str(config.endpoint)}/file_list/', params=payload).json() ) def to_xarray(self): return xr.open_mfdataset(self.file_list) def __repr__(self): attr_list = ', '.join('{}: {}'.format(attr, getattr(self, attr)) for attr in ('start_time', 'end_time', '_selected_sites', '_selected_datastreams')) return f'Archive({attr_list})' app/setup.py +1 −0 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ setup( 'fsspec', 'requests', 'uvicorn', 'pandas', ], entry_points={ 'fsspec.specs': [ Loading Loading
.gitlab-ci.yml +1 −0 Original line number Diff line number Diff line Loading @@ -81,6 +81,7 @@ deploy: --set config.db.username=$PGDB_USER --set config.db.password=$PGDB_PASS --set ingress.url=api-$CI_BUILD_REF_SLUG.k8s.arm.gov --set dataService.guc.loadBalancerIP="" stop_review: stage: deploy Loading
app/ADL/api/models.py +31 −2 Original line number Diff line number Diff line from datetime import datetime import databases import sqlalchemy from ormar import Integer, Model, String from ormar import Integer, Model, String, DateTime, Boolean, ForeignKey from ormar.decorators import property_field from . import config database = databases.Database(config.db_uri) dr_meta = sqlalchemy.MetaData(schema='data_reception') class ARMPath(Model): Loading Loading @@ -41,7 +44,7 @@ class Datastreams(ARMPath): class Files(ARMPath): class Meta: tablename = 'file_info' metadata = sqlalchemy.MetaData(schema='data_reception') metadata = dr_meta database = database versioned_filename: str = String(primary_key=True, max_length=64) size: int = Integer(name='size_in_bytes') Loading @@ -53,3 +56,29 @@ class Files(ARMPath): def name(self) -> str: site = self.datastream[3:] return f'/{site}/{self.datastream}/{self.versioned_filename}' # patched_field = ForeignKey(Files, related_name='contents') # patched_field.primary_key=True # class FileContents(Model): # class Meta: # tablename = 'file_contents' # metadata = dr_meta # database=database # versioned_filename: Files = patched_field # start_time: datetime = DateTime() # end_time: datetime = DateTime() # deleted: bool = Boolean() # n_samples: int = Integer() class ARMFiles(Model): class Meta: tablename = 'arm_files' metadata = dr_meta database = database versioned_filename: str = String(primary_key=True, max_length=64) size: int = Integer(name='size_in_bytes') datastream: str = String(max_length=30) md5_checksum: str = String(max_length=32, min_length=32, regex=r'[a-fA-F\d]+') start_time: datetime = DateTime() end_time: datetime = DateTime()
app/ADL/api/v1.py +16 −2 Original line number Diff line number Diff line from datetime import datetime from logging import getLogger from typing import List from typing import List, Union from collections.abc import Collection import fsspec from fastapi.responses import StreamingResponse Loading @@ -8,7 +10,7 @@ from fastapi_versioning import versioned_api_route from starlette.types import Send from . import config from .models import Datastreams, Files, Sites from .models import ARMFiles, Datastreams, Files, Sites router = APIRouter(route_class=versioned_api_route(1)) log = getLogger(__name__) Loading Loading @@ -85,3 +87,15 @@ async def stream(site: Sites.site_code, datastream: Datastreams.datastream, file yield from stream return StreamingResponse(file_generator()) @router.get('/fs/file_list/') async def file_list(datastreams: Datastreams.datastream, start_time: datetime, end_time: datetime, include_deleted: bool=False): if not isinstance(datastreams, Collection): datastreams = [datastreams] return await ARMFiles.objects.filter( datastream__in=[datastreams], start_time__gte=start_time, end_time__lte=end_time, ).all() No newline at end of file
app/ADL/client/__init__.py +77 −0 Original line number Diff line number Diff line import json import re from collections.abc import Collection from pprint import pprint import pandas as pd import requests import xarray as xr from . import config SITE_REGEX = r'^D?[a-z]{3}$' DATASTREAM_REGEX = r'^D?([a-z]{3})\w*([A-Z][0-9]*)\.([a-z\d]\d)$' class Archive(object): def __init__(self, username, password): self._selected_sites = None self._selected_datastreams = None self._selected_time_start = None self._selected_time_stop = None self.attributes = {} self.token = None self.__authenticate() def __authenticate(self): from secrets import token_urlsafe self.token = token_urlsafe() def __getitem__(self, key): if isinstance(key, str): if re.match(SITE_REGEX, key): self._selected_sites = set({key}) if re.match(DATASTREAM_REGEX, key): self._selected_datastreams = set({key}) elif isinstance(key, slice): if key.start: self._selected_time_start = pd.to_datetime(key.start) if key.stop: self._selected_time_stop = pd.to_datetime(key.stop) return self @property def start_time(self): return self._selected_time_start or pd.to_datetime('1970-01-01') @property def end_time(self): return self._selected_time_stop or pd.Timestamp.now() @property def file_count(self): if not self._selected_datastreams: return 29796186 if self._selected_time_start: return (self.end_time - self._selected_time_start).days @property def file_list(self): payload = { 'datastreams': self._selected_datastreams, 'start_time': self.start_time, 'end_time': self.end_time } r = requests.get(f'{str(config.endpoint)}/file_list/', params=payload) return ( f'{str(config.endpoint)}/stream/{row["datastream"][:3]}/{row["datastream"]}/{row["versioned_filename"]}' for row in requests.get(f'{str(config.endpoint)}/file_list/', params=payload).json() ) def to_xarray(self): return xr.open_mfdataset(self.file_list) def __repr__(self): attr_list = ', '.join('{}: {}'.format(attr, getattr(self, attr)) for attr in ('start_time', 'end_time', '_selected_sites', '_selected_datastreams')) return f'Archive({attr_list})'
app/setup.py +1 −0 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ setup( 'fsspec', 'requests', 'uvicorn', 'pandas', ], entry_points={ 'fsspec.specs': [ Loading