Loading lib/galaxy/authnz/xsd/oidc_backends_config.xsd +7 −0 Original line number Diff line number Diff line Loading @@ -142,6 +142,13 @@ </xs:documentation> </xs:annotation> </xs:element> <xs:element name="alias" minOccurs="0" type="xs:string"> <xs:annotation> <xs:documentation> Specifies alias to be used in web client. </xs:documentation> </xs:annotation> </xs:element> </xs:all> <xs:attribute name="name" type="xs:string" use="required"> <xs:annotation> Loading lib/galaxy/objectstore/rucio.py +3 −217 Original line number Diff line number Diff line import hashlib <<<<<<< HEAD from typing import Optional from .caching import ( CacheTarget, enable_cache_monitor, InProcessCacheMonitor, parse_caching_config_dict_from_xml, ) try: from ..authnz.util import provider_name_to_backend except ImportError: provider_name_to_backend = None # type: ignore[misc,assignment] ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 import logging import os import shutil Loading @@ -33,14 +17,11 @@ try: except ImportError: Client = None <<<<<<< HEAD ======= try: from galaxy.authnz.util import provider_name_to_backend except ImportError: provider_name_to_backend = None # type: ignore[assignment, unused-ignore] >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 from galaxy.exceptions import ( ObjectInvalid, ObjectNotFound, Loading @@ -51,16 +32,12 @@ from galaxy.util import ( umask_fix_perms, unlink, ) <<<<<<< HEAD from galaxy.util.path import safe_relpath from ..objectstore import ConcreteObjectStore ======= from ._caching_base import CachingConcreteObjectStore from .caching import ( enable_cache_monitor, parse_caching_config_dict_from_xml, ) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 log = logging.getLogger(__name__) Loading Loading @@ -104,15 +81,12 @@ def parse_config_xml(config_xml): if e_xml: rucio_download_schemes = [{k: e.get(k) for k in attrs_schemes} for e in e_xml] <<<<<<< HEAD oidc_providers = [] e_xml = config_xml.findall("oidc_provider") if e_xml: oidc_providers = [e.text for e in e_xml] ======= oidc_provider = config_xml.findtext("oidc_provider", None) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 enable_cache_mon = string_as_bool(config_xml.findtext("enable_cache_monitor", "False")) e_xml = config_xml.findall("rucio_upload_scheme") Loading @@ -126,10 +100,7 @@ def parse_config_xml(config_xml): rucio_upload_scheme = None rucio_scope = None rucio_register_only = False <<<<<<< HEAD ======= oidc_provider = None >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 e_xml = config_xml.findall("rucio_auth") if not e_xml: Loading Loading @@ -163,11 +134,7 @@ def parse_config_xml(config_xml): "cache": cache_dict, "rucio": rucio_dict, "extra_dirs": extra_dirs, <<<<<<< HEAD "oidc_providers": oidc_providers, ======= "oidc_provider": oidc_provider, >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 "enable_cache_monitor": enable_cache_mon, } except Exception: Loading Loading @@ -195,11 +162,7 @@ class RucioBroker: auth_host=self.config["auth_host"], account=self.config["account"], auth_type=self.config["auth_type"], <<<<<<< HEAD creds={"username": self.config["username"], "password": self.config["password"]}, ======= creds={"username": self.config["username"], "password": self.config["username"]}, >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 ) return client Loading Loading @@ -278,12 +241,8 @@ class RucioBroker: } items = [item] download_client = self.get_rucio_download_client(auth_token=auth_token) <<<<<<< HEAD res = download_client.download_dids(items) os.replace(res[0]["dest_file_paths"][0], dest_path) ======= download_client.download_dids(items) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 except Exception as e: log.exception(f"Cannot download file: {str(e)}") return False Loading Loading @@ -318,11 +277,7 @@ class RucioBroker: return True <<<<<<< HEAD class RucioObjectStore(ConcreteObjectStore): ======= class RucioObjectStore(CachingConcreteObjectStore): >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 """ Object store implementation that uses ORNL remote data broker. Loading @@ -330,22 +285,13 @@ class RucioObjectStore(CachingConcreteObjectStore): Galaxy at some future point or significantly modified. """ <<<<<<< HEAD cache_monitor: Optional[InProcessCacheMonitor] = None ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 store_type = "rucio" def to_dict(self): rval = super().to_dict() rval["rucio"] = self.rucio_config rval["cache"] = self.cache_config <<<<<<< HEAD rval["oidc_providers"] = self.oidc_providers ======= rval["oidc_provider"] = self.oidc_provider >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 rval["enable_cache_monitor"] = self.enable_cache_monitor return rval Loading @@ -353,11 +299,7 @@ class RucioObjectStore(CachingConcreteObjectStore): super().__init__(config, config_dict) self.rucio_config = config_dict.get("rucio") or {} <<<<<<< HEAD self.oidc_providers = config_dict.get("oidc_providers", None) ======= self.oidc_provider = config_dict.get("oidc_provider", None) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 self.rucio_broker = RucioBroker(self.rucio_config) cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) Loading @@ -369,64 +311,8 @@ class RucioObjectStore(CachingConcreteObjectStore): self._initialize() def _initialize(self): <<<<<<< HEAD if self.enable_cache_monitor: self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval) def _in_cache(self, rel_path): """Check if the given dataset is in the local cache and return True if so.""" cache_path = self._get_cache_path(rel_path) return os.path.exists(cache_path) def _construct_path( self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, **kwargs, ): # extra_dir should never be constructed from provided data but just # make sure there are no shenanigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning("extra_dir is not normalized: %s", extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning("alt_name would locate path outside dir: %s", alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but S3 will not # follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # for JOB_WORK directory if obj_dir: rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) if base_dir: base = self.extra_dirs.get(base_dir) return os.path.join(str(base), rel_path) if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") return rel_path def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) ======= self._ensure_staging_path_writable() self._start_cache_monitor_if_needed() >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _pull_into_cache(self, rel_path, auth_token): log.debug("rucio _pull_into_cache: %s", rel_path) Loading Loading @@ -479,28 +365,6 @@ class RucioObjectStore(CachingConcreteObjectStore): def parse_xml(cls, config_xml): return parse_config_xml(config_xml) <<<<<<< HEAD def file_ready(self, obj, **kwargs): log.debug("rucio file_ready") """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return ``True`` if so, ``False`` otherwise. """ rel_path = self._construct_path(obj, **kwargs) # Make sure the size in cache is available in its entirety if self._in_cache(rel_path): if os.path.getsize(self._get_cache_path(rel_path)) == self.rucio_broker.get_size(rel_path): return True log.debug( "Waiting for dataset %s to transfer from OS: %s/%s", rel_path, os.path.getsize(self._get_cache_path(rel_path)), self.rucio_broker.get_size(rel_path), ) return False ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _create(self, obj, **kwargs): if not self._exists(obj, **kwargs): # Pull out locally used fields Loading Loading @@ -531,16 +395,6 @@ class RucioObjectStore(CachingConcreteObjectStore): log.debug("rucio _create: %s", rel_path) return self <<<<<<< HEAD def _empty(self, obj, **kwargs): log.debug("rucio _empty") if self._exists(obj, **kwargs): return bool(self._size(obj, **kwargs) > 0) else: raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) log.debug("rucio _size: %s", rel_path) Loading @@ -553,12 +407,6 @@ class RucioObjectStore(CachingConcreteObjectStore): if size != 0: return size if self._exists(obj, **kwargs): <<<<<<< HEAD return self.rucio_broker.get_size(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 ======= return self._get_remote_size(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 Loading @@ -566,18 +414,14 @@ class RucioObjectStore(CachingConcreteObjectStore): def _get_remote_size(self, rel_path): return self.rucio_broker.get_size(rel_path) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _delete(self, obj, entire_dir=False, **kwargs): rel_path = self._construct_path(obj, **kwargs) extra_dir = kwargs.get("extra_dir", None) base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) obj_dir = kwargs.get("obj_dir", False) <<<<<<< HEAD ======= log.debug("rucio _delete: %s", rel_path) auth_token = self._get_token(**kwargs) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 try: # Remove temporary data in JOB_WORK directory Loading @@ -585,12 +429,6 @@ class RucioObjectStore(CachingConcreteObjectStore): shutil.rmtree(os.path.abspath(rel_path)) return True <<<<<<< HEAD log.debug("rucio _delete: %s", rel_path) auth_token = self._get_token(**kwargs) ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 # Delete from cache first if entire_dir and extra_dir: shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) Loading @@ -605,23 +443,6 @@ class RucioObjectStore(CachingConcreteObjectStore): log.exception("%s delete error", self._get_filename(obj, **kwargs)) return False <<<<<<< HEAD def _get_data(self, obj, start=0, count=-1, **kwargs): rel_path = self._construct_path(obj, **kwargs) log.debug("rucio _get_data: %s", rel_path) auth_token = self._get_token(**kwargs) # Check cache first and get file if not there if not self._in_cache(rel_path) or os.path.getsize(self._get_cache_path(rel_path)) == 0: self._pull_into_cache(rel_path, auth_token) # Read the file content from cache data_file = open(self._get_cache_path(rel_path)) data_file.seek(start) content = data_file.read(count) data_file.close() return content ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _get_token(self, **kwargs): auth_token = kwargs.get("auth_token", None) if auth_token: Loading @@ -634,17 +455,11 @@ class RucioObjectStore(CachingConcreteObjectStore): user = trans.user else: user = arg_user <<<<<<< HEAD for oidc_provider in self.oidc_providers: backend = provider_name_to_backend(oidc_provider) tokens = user.get_oidc_tokens(backend) if tokens["id"]: return tokens["id"] ======= backend = provider_name_to_backend(self.oidc_provider) tokens = user.get_oidc_tokens(backend) return tokens["id"] >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 except Exception as e: log.debug("Failed to get auth token: %s", e) return None Loading @@ -652,28 +467,16 @@ class RucioObjectStore(CachingConcreteObjectStore): def _get_filename(self, obj, **kwargs): base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) <<<<<<< HEAD rel_path = self._construct_path(obj, **kwargs) sync_cache = kwargs.get("sync_cache", True) ======= auth_token = self._get_token(**kwargs) rel_path = self._construct_path(obj, **kwargs) sync_cache = kwargs.get("sync_cache", True) log.debug("rucio _get_filename: %s", rel_path) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 # for JOB_WORK directory if base_dir and dir_only: return os.path.abspath(rel_path) <<<<<<< HEAD auth_token = self._get_token(**kwargs) log.debug("rucio _get_filename: %s", rel_path) ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 cache_path = self._get_cache_path(rel_path) if not sync_cache: return cache_path Loading Loading @@ -707,11 +510,7 @@ class RucioObjectStore(CachingConcreteObjectStore): file_name = self._get_cache_path(rel_path) if not os.path.islink(file_name): raise ObjectInvalid( <<<<<<< HEAD "rucio objectstore._register_file, rucio_register_only " "is set, but file in cache is not a link " ======= "rucio objectstore._register_file, rucio_register_only is set, but file in cache is not a link " >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 ) if os.path.islink(file_name): file_name = os.readlink(file_name) Loading Loading @@ -766,18 +565,5 @@ class RucioObjectStore(CachingConcreteObjectStore): kwargs["object_id"] = obj.id return kwargs <<<<<<< HEAD @property def cache_target(self) -> CacheTarget: return CacheTarget( self.staging_path, self.cache_size, 0.9, ) def shutdown(self): self.cache_monitor and self.cache_monitor.shutdown() ======= def shutdown(self): self._shutdown_cache_monitor() >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 lib/galaxy/objectstore/rucio_extra_clients.py +0 −37 Original line number Diff line number Diff line import copy import logging import time <<<<<<< HEAD from abc import ABC try: from rucio.client.uploadclient import UploadClient from rucio.common.exception import ( # type: ignore ======= try: from rucio.client.uploadclient import UploadClient from rucio.common.exception import ( >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 InputValidationError, NoFilesUploaded, NotAllFilesUploaded, Loading @@ -21,11 +13,7 @@ try: from rucio.common.utils import generate_uuid from rucio.rse import rsemanager as rsemgr except ImportError: <<<<<<< HEAD UploadClient = ABC ======= UploadClient = object >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 class DeleteClient(UploadClient): Loading @@ -47,11 +35,7 @@ class DeleteClient(UploadClient): if not self.rses.get(rse): rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo)) if not ignore_availability and rse_settings["availability_delete"] != 1: <<<<<<< HEAD logger(logging.DEBUG, "%s is not available for deletion. No actions have been taken" % rse) ======= logger(logging.DEBUG, "%s is not available for deletion. No actions have been taken", rse) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 continue # protocol handling and deletion Loading @@ -68,13 +52,8 @@ class DeleteClient(UploadClient): success = True except Exception as error: logger(logging.WARNING, "Delete attempt failed") <<<<<<< HEAD logger(logging.INFO, "Exception: %s" % str(error), exc_info=True) logger(logging.DEBUG, "Successfully deleted dataset %s" % pfn) ======= logger(logging.INFO, "Exception: %s", error, exc_info=True) logger(logging.DEBUG, "Successfully deleted dataset %s", pfn) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 class InPlaceIngestClient(UploadClient): Loading Loading @@ -128,11 +107,7 @@ class InPlaceIngestClient(UploadClient): if not self.rses.get(rse): rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo)) if not ignore_availability and rse_settings["availability_write"] != 1: <<<<<<< HEAD raise RSEWriteBlocked("%s is not available for writing. No actions have been taken" % rse) ======= raise RSEWriteBlocked(f"{rse} is not available for writing. No actions have been taken") >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 dataset_scope = file.get("dataset_scope") dataset_name = file.get("dataset_name") Loading @@ -145,11 +120,7 @@ class InPlaceIngestClient(UploadClient): registered_file_dids.add(f"{file['did_scope']}:{file['did_name']}") wrong_dids = registered_file_dids.intersection(registered_dataset_dids) if len(wrong_dids): <<<<<<< HEAD raise InputValidationError("DIDs used to address both files and datasets: %s" % str(wrong_dids)) ======= raise InputValidationError(f"DIDs used to address both files and datasets: {wrong_dids}") >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 logger(logging.DEBUG, "Input validation done.") # clear this set again to ensure that we only try to register datasets once Loading @@ -158,11 +129,7 @@ class InPlaceIngestClient(UploadClient): summary = [] for file in files: basename = file["basename"] <<<<<<< HEAD logger(logging.INFO, "Preparing upload for file %s" % basename) ======= logger(logging.INFO, "Preparing upload for file %s", basename) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 pfn = file.get("pfn") Loading Loading @@ -202,11 +169,7 @@ class InPlaceIngestClient(UploadClient): trace["transferEnd"] = time.time() trace["clientState"] = "DONE" file["state"] = "A" <<<<<<< HEAD logger(logging.INFO, "Successfully uploaded file %s" % basename) ======= logger(logging.INFO, "Successfully uploaded file %s", basename) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 self._send_trace(trace) if summary_file_path: Loading lib/galaxy/webapps/galaxy/api/job_tokens.py +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class FastAPIJobTokens: ) def get_token( self, job_id: EncodedDatabaseIdField, job_id: str, job_key: str = Query( description=( "A key used to authenticate this request as acting on behalf or a job runner for the specified job" Loading Loading
lib/galaxy/authnz/xsd/oidc_backends_config.xsd +7 −0 Original line number Diff line number Diff line Loading @@ -142,6 +142,13 @@ </xs:documentation> </xs:annotation> </xs:element> <xs:element name="alias" minOccurs="0" type="xs:string"> <xs:annotation> <xs:documentation> Specifies alias to be used in web client. </xs:documentation> </xs:annotation> </xs:element> </xs:all> <xs:attribute name="name" type="xs:string" use="required"> <xs:annotation> Loading
lib/galaxy/objectstore/rucio.py +3 −217 Original line number Diff line number Diff line import hashlib <<<<<<< HEAD from typing import Optional from .caching import ( CacheTarget, enable_cache_monitor, InProcessCacheMonitor, parse_caching_config_dict_from_xml, ) try: from ..authnz.util import provider_name_to_backend except ImportError: provider_name_to_backend = None # type: ignore[misc,assignment] ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 import logging import os import shutil Loading @@ -33,14 +17,11 @@ try: except ImportError: Client = None <<<<<<< HEAD ======= try: from galaxy.authnz.util import provider_name_to_backend except ImportError: provider_name_to_backend = None # type: ignore[assignment, unused-ignore] >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 from galaxy.exceptions import ( ObjectInvalid, ObjectNotFound, Loading @@ -51,16 +32,12 @@ from galaxy.util import ( umask_fix_perms, unlink, ) <<<<<<< HEAD from galaxy.util.path import safe_relpath from ..objectstore import ConcreteObjectStore ======= from ._caching_base import CachingConcreteObjectStore from .caching import ( enable_cache_monitor, parse_caching_config_dict_from_xml, ) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 log = logging.getLogger(__name__) Loading Loading @@ -104,15 +81,12 @@ def parse_config_xml(config_xml): if e_xml: rucio_download_schemes = [{k: e.get(k) for k in attrs_schemes} for e in e_xml] <<<<<<< HEAD oidc_providers = [] e_xml = config_xml.findall("oidc_provider") if e_xml: oidc_providers = [e.text for e in e_xml] ======= oidc_provider = config_xml.findtext("oidc_provider", None) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 enable_cache_mon = string_as_bool(config_xml.findtext("enable_cache_monitor", "False")) e_xml = config_xml.findall("rucio_upload_scheme") Loading @@ -126,10 +100,7 @@ def parse_config_xml(config_xml): rucio_upload_scheme = None rucio_scope = None rucio_register_only = False <<<<<<< HEAD ======= oidc_provider = None >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 e_xml = config_xml.findall("rucio_auth") if not e_xml: Loading Loading @@ -163,11 +134,7 @@ def parse_config_xml(config_xml): "cache": cache_dict, "rucio": rucio_dict, "extra_dirs": extra_dirs, <<<<<<< HEAD "oidc_providers": oidc_providers, ======= "oidc_provider": oidc_provider, >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 "enable_cache_monitor": enable_cache_mon, } except Exception: Loading Loading @@ -195,11 +162,7 @@ class RucioBroker: auth_host=self.config["auth_host"], account=self.config["account"], auth_type=self.config["auth_type"], <<<<<<< HEAD creds={"username": self.config["username"], "password": self.config["password"]}, ======= creds={"username": self.config["username"], "password": self.config["username"]}, >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 ) return client Loading Loading @@ -278,12 +241,8 @@ class RucioBroker: } items = [item] download_client = self.get_rucio_download_client(auth_token=auth_token) <<<<<<< HEAD res = download_client.download_dids(items) os.replace(res[0]["dest_file_paths"][0], dest_path) ======= download_client.download_dids(items) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 except Exception as e: log.exception(f"Cannot download file: {str(e)}") return False Loading Loading @@ -318,11 +277,7 @@ class RucioBroker: return True <<<<<<< HEAD class RucioObjectStore(ConcreteObjectStore): ======= class RucioObjectStore(CachingConcreteObjectStore): >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 """ Object store implementation that uses ORNL remote data broker. Loading @@ -330,22 +285,13 @@ class RucioObjectStore(CachingConcreteObjectStore): Galaxy at some future point or significantly modified. """ <<<<<<< HEAD cache_monitor: Optional[InProcessCacheMonitor] = None ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 store_type = "rucio" def to_dict(self): rval = super().to_dict() rval["rucio"] = self.rucio_config rval["cache"] = self.cache_config <<<<<<< HEAD rval["oidc_providers"] = self.oidc_providers ======= rval["oidc_provider"] = self.oidc_provider >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 rval["enable_cache_monitor"] = self.enable_cache_monitor return rval Loading @@ -353,11 +299,7 @@ class RucioObjectStore(CachingConcreteObjectStore): super().__init__(config, config_dict) self.rucio_config = config_dict.get("rucio") or {} <<<<<<< HEAD self.oidc_providers = config_dict.get("oidc_providers", None) ======= self.oidc_provider = config_dict.get("oidc_provider", None) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 self.rucio_broker = RucioBroker(self.rucio_config) cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) Loading @@ -369,64 +311,8 @@ class RucioObjectStore(CachingConcreteObjectStore): self._initialize() def _initialize(self): <<<<<<< HEAD if self.enable_cache_monitor: self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval) def _in_cache(self, rel_path): """Check if the given dataset is in the local cache and return True if so.""" cache_path = self._get_cache_path(rel_path) return os.path.exists(cache_path) def _construct_path( self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, **kwargs, ): # extra_dir should never be constructed from provided data but just # make sure there are no shenanigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning("extra_dir is not normalized: %s", extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning("alt_name would locate path outside dir: %s", alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but S3 will not # follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # for JOB_WORK directory if obj_dir: rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) if base_dir: base = self.extra_dirs.get(base_dir) return os.path.join(str(base), rel_path) if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") return rel_path def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) ======= self._ensure_staging_path_writable() self._start_cache_monitor_if_needed() >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _pull_into_cache(self, rel_path, auth_token): log.debug("rucio _pull_into_cache: %s", rel_path) Loading Loading @@ -479,28 +365,6 @@ class RucioObjectStore(CachingConcreteObjectStore): def parse_xml(cls, config_xml): return parse_config_xml(config_xml) <<<<<<< HEAD def file_ready(self, obj, **kwargs): log.debug("rucio file_ready") """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return ``True`` if so, ``False`` otherwise. """ rel_path = self._construct_path(obj, **kwargs) # Make sure the size in cache is available in its entirety if self._in_cache(rel_path): if os.path.getsize(self._get_cache_path(rel_path)) == self.rucio_broker.get_size(rel_path): return True log.debug( "Waiting for dataset %s to transfer from OS: %s/%s", rel_path, os.path.getsize(self._get_cache_path(rel_path)), self.rucio_broker.get_size(rel_path), ) return False ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _create(self, obj, **kwargs): if not self._exists(obj, **kwargs): # Pull out locally used fields Loading Loading @@ -531,16 +395,6 @@ class RucioObjectStore(CachingConcreteObjectStore): log.debug("rucio _create: %s", rel_path) return self <<<<<<< HEAD def _empty(self, obj, **kwargs): log.debug("rucio _empty") if self._exists(obj, **kwargs): return bool(self._size(obj, **kwargs) > 0) else: raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) log.debug("rucio _size: %s", rel_path) Loading @@ -553,12 +407,6 @@ class RucioObjectStore(CachingConcreteObjectStore): if size != 0: return size if self._exists(obj, **kwargs): <<<<<<< HEAD return self.rucio_broker.get_size(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 ======= return self._get_remote_size(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 Loading @@ -566,18 +414,14 @@ class RucioObjectStore(CachingConcreteObjectStore): def _get_remote_size(self, rel_path): return self.rucio_broker.get_size(rel_path) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _delete(self, obj, entire_dir=False, **kwargs): rel_path = self._construct_path(obj, **kwargs) extra_dir = kwargs.get("extra_dir", None) base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) obj_dir = kwargs.get("obj_dir", False) <<<<<<< HEAD ======= log.debug("rucio _delete: %s", rel_path) auth_token = self._get_token(**kwargs) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 try: # Remove temporary data in JOB_WORK directory Loading @@ -585,12 +429,6 @@ class RucioObjectStore(CachingConcreteObjectStore): shutil.rmtree(os.path.abspath(rel_path)) return True <<<<<<< HEAD log.debug("rucio _delete: %s", rel_path) auth_token = self._get_token(**kwargs) ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 # Delete from cache first if entire_dir and extra_dir: shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) Loading @@ -605,23 +443,6 @@ class RucioObjectStore(CachingConcreteObjectStore): log.exception("%s delete error", self._get_filename(obj, **kwargs)) return False <<<<<<< HEAD def _get_data(self, obj, start=0, count=-1, **kwargs): rel_path = self._construct_path(obj, **kwargs) log.debug("rucio _get_data: %s", rel_path) auth_token = self._get_token(**kwargs) # Check cache first and get file if not there if not self._in_cache(rel_path) or os.path.getsize(self._get_cache_path(rel_path)) == 0: self._pull_into_cache(rel_path, auth_token) # Read the file content from cache data_file = open(self._get_cache_path(rel_path)) data_file.seek(start) content = data_file.read(count) data_file.close() return content ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 def _get_token(self, **kwargs): auth_token = kwargs.get("auth_token", None) if auth_token: Loading @@ -634,17 +455,11 @@ class RucioObjectStore(CachingConcreteObjectStore): user = trans.user else: user = arg_user <<<<<<< HEAD for oidc_provider in self.oidc_providers: backend = provider_name_to_backend(oidc_provider) tokens = user.get_oidc_tokens(backend) if tokens["id"]: return tokens["id"] ======= backend = provider_name_to_backend(self.oidc_provider) tokens = user.get_oidc_tokens(backend) return tokens["id"] >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 except Exception as e: log.debug("Failed to get auth token: %s", e) return None Loading @@ -652,28 +467,16 @@ class RucioObjectStore(CachingConcreteObjectStore): def _get_filename(self, obj, **kwargs): base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) <<<<<<< HEAD rel_path = self._construct_path(obj, **kwargs) sync_cache = kwargs.get("sync_cache", True) ======= auth_token = self._get_token(**kwargs) rel_path = self._construct_path(obj, **kwargs) sync_cache = kwargs.get("sync_cache", True) log.debug("rucio _get_filename: %s", rel_path) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 # for JOB_WORK directory if base_dir and dir_only: return os.path.abspath(rel_path) <<<<<<< HEAD auth_token = self._get_token(**kwargs) log.debug("rucio _get_filename: %s", rel_path) ======= >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 cache_path = self._get_cache_path(rel_path) if not sync_cache: return cache_path Loading Loading @@ -707,11 +510,7 @@ class RucioObjectStore(CachingConcreteObjectStore): file_name = self._get_cache_path(rel_path) if not os.path.islink(file_name): raise ObjectInvalid( <<<<<<< HEAD "rucio objectstore._register_file, rucio_register_only " "is set, but file in cache is not a link " ======= "rucio objectstore._register_file, rucio_register_only is set, but file in cache is not a link " >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 ) if os.path.islink(file_name): file_name = os.readlink(file_name) Loading Loading @@ -766,18 +565,5 @@ class RucioObjectStore(CachingConcreteObjectStore): kwargs["object_id"] = obj.id return kwargs <<<<<<< HEAD @property def cache_target(self) -> CacheTarget: return CacheTarget( self.staging_path, self.cache_size, 0.9, ) def shutdown(self): self.cache_monitor and self.cache_monitor.shutdown() ======= def shutdown(self): self._shutdown_cache_monitor() >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
lib/galaxy/objectstore/rucio_extra_clients.py +0 −37 Original line number Diff line number Diff line import copy import logging import time <<<<<<< HEAD from abc import ABC try: from rucio.client.uploadclient import UploadClient from rucio.common.exception import ( # type: ignore ======= try: from rucio.client.uploadclient import UploadClient from rucio.common.exception import ( >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 InputValidationError, NoFilesUploaded, NotAllFilesUploaded, Loading @@ -21,11 +13,7 @@ try: from rucio.common.utils import generate_uuid from rucio.rse import rsemanager as rsemgr except ImportError: <<<<<<< HEAD UploadClient = ABC ======= UploadClient = object >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 class DeleteClient(UploadClient): Loading @@ -47,11 +35,7 @@ class DeleteClient(UploadClient): if not self.rses.get(rse): rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo)) if not ignore_availability and rse_settings["availability_delete"] != 1: <<<<<<< HEAD logger(logging.DEBUG, "%s is not available for deletion. No actions have been taken" % rse) ======= logger(logging.DEBUG, "%s is not available for deletion. No actions have been taken", rse) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 continue # protocol handling and deletion Loading @@ -68,13 +52,8 @@ class DeleteClient(UploadClient): success = True except Exception as error: logger(logging.WARNING, "Delete attempt failed") <<<<<<< HEAD logger(logging.INFO, "Exception: %s" % str(error), exc_info=True) logger(logging.DEBUG, "Successfully deleted dataset %s" % pfn) ======= logger(logging.INFO, "Exception: %s", error, exc_info=True) logger(logging.DEBUG, "Successfully deleted dataset %s", pfn) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 class InPlaceIngestClient(UploadClient): Loading Loading @@ -128,11 +107,7 @@ class InPlaceIngestClient(UploadClient): if not self.rses.get(rse): rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo)) if not ignore_availability and rse_settings["availability_write"] != 1: <<<<<<< HEAD raise RSEWriteBlocked("%s is not available for writing. No actions have been taken" % rse) ======= raise RSEWriteBlocked(f"{rse} is not available for writing. No actions have been taken") >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 dataset_scope = file.get("dataset_scope") dataset_name = file.get("dataset_name") Loading @@ -145,11 +120,7 @@ class InPlaceIngestClient(UploadClient): registered_file_dids.add(f"{file['did_scope']}:{file['did_name']}") wrong_dids = registered_file_dids.intersection(registered_dataset_dids) if len(wrong_dids): <<<<<<< HEAD raise InputValidationError("DIDs used to address both files and datasets: %s" % str(wrong_dids)) ======= raise InputValidationError(f"DIDs used to address both files and datasets: {wrong_dids}") >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 logger(logging.DEBUG, "Input validation done.") # clear this set again to ensure that we only try to register datasets once Loading @@ -158,11 +129,7 @@ class InPlaceIngestClient(UploadClient): summary = [] for file in files: basename = file["basename"] <<<<<<< HEAD logger(logging.INFO, "Preparing upload for file %s" % basename) ======= logger(logging.INFO, "Preparing upload for file %s", basename) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 pfn = file.get("pfn") Loading Loading @@ -202,11 +169,7 @@ class InPlaceIngestClient(UploadClient): trace["transferEnd"] = time.time() trace["clientState"] = "DONE" file["state"] = "A" <<<<<<< HEAD logger(logging.INFO, "Successfully uploaded file %s" % basename) ======= logger(logging.INFO, "Successfully uploaded file %s", basename) >>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62 self._send_trace(trace) if summary_file_path: Loading
lib/galaxy/webapps/galaxy/api/job_tokens.py +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class FastAPIJobTokens: ) def get_token( self, job_id: EncodedDatabaseIdField, job_id: str, job_key: str = Query( description=( "A key used to authenticate this request as acting on behalf or a job runner for the specified job" Loading