Loading lib/galaxy/objectstore/onedata.py +53 −35 Original line number Diff line number Diff line Loading @@ -7,8 +7,9 @@ import os from datetime import datetime try: from onedatafilerestclient import ( OnedataFileRESTClient, from onedatafilerestclient import OnedataFileRESTClient from onedatafilerestclient.errors import ( OnedataError, OnedataRESTError, ) except ImportError: Loading Loading @@ -131,7 +132,7 @@ class OnedataObjectStore(CachingConcreteObjectStore): self._start_cache_monitor_if_needed() @classmethod def parse_xml(clazz, config_xml): def parse_xml(cls, config_xml): return _parse_config_xml(config_xml) def to_dict(self): Loading @@ -155,28 +156,26 @@ class OnedataObjectStore(CachingConcreteObjectStore): ) return as_dict def _construct_onedata_path(self, rel_path): def _build_remote_path(self, rel_path): return os.path.join(self.galaxy_root_dir, rel_path) def _get_remote_size(self, rel_path): try: onedata_path = self._construct_onedata_path(rel_path) return self._client.get_attributes(self.space_name, file_path=onedata_path)["size"] except OnedataRESTError: remote_path = self._build_remote_path(rel_path) return self._client.get_attributes(self.space_name, attributes=["size"], file_path=remote_path)["size"] except OnedataError: log.exception("Could not get '%s' size from Onedata", rel_path) return -1 def _exists_remotely(self, rel_path): try: onedata_path = self._construct_onedata_path(rel_path) self._client.get_attributes(self.space_name, file_path=onedata_path) remote_path = self._build_remote_path(rel_path) self._client.get_attributes(self.space_name, attributes=["type", "size"], file_path=remote_path) return True except OnedataRESTError as ex: if ex.http_code == 404: return False elif ex.http_code == 400 and ex.error_details.get("errno", None) == "enoent": except OnedataError as ex: if _is_not_found_onedata_rest_error(ex): return False else: log.exception("Trouble checking '%s' existence in Onedata", rel_path) return False Loading @@ -186,21 +185,23 @@ class OnedataObjectStore(CachingConcreteObjectStore): log.debug("Pulling file '%s' into cache to %s", rel_path, dst_path) onedata_path = self._construct_onedata_path(rel_path) file_size = self._client.get_attributes(self.space_name, file_path=onedata_path)["size"] remote_path = self._build_remote_path(rel_path) file_size = self._client.get_attributes(self.space_name, attributes=["size"], file_path=remote_path)["size"] # Test if cache is large enough to hold the new file if not self._caching_allowed(rel_path, file_size): return False with open(dst_path, "wb") as dst: for chunk in self._client.iter_file_content(self.space_name, STREAM_CHUNK_SIZE, file_path=onedata_path): for chunk in self._client.iter_file_content( self.space_name, chunk_size=STREAM_CHUNK_SIZE, file_path=remote_path ): dst.write(chunk) log.debug("Pulled '%s' into cache to %s", rel_path, dst_path) return True except OnedataRESTError: except OnedataError: log.exception("Problem downloading file '%s'", rel_path) return False Loading @@ -211,7 +212,7 @@ class OnedataObjectStore(CachingConcreteObjectStore): ``rel_path`` as the path. """ try: source_file = source_file if source_file else self._get_cache_path(rel_path) source_file = source_file or self._get_cache_path(rel_path) if os.path.exists(source_file): if os.path.getsize(source_file) == 0 and self._exists_remotely(rel_path): log.debug( Loading @@ -226,12 +227,14 @@ class OnedataObjectStore(CachingConcreteObjectStore): os.path.getsize(source_file), rel_path, ) onedata_path = self._construct_onedata_path(rel_path) remote_path = self._build_remote_path(rel_path) if not self._exists_remotely(rel_path): self._client.create_file(self.space_name, onedata_path, "REG", create_parents=True) file_id = self._client.get_file_id(self.space_name, onedata_path) file_id = self._client.create_file( self.space_name, file_path=remote_path, file_type="REG", create_parents=True ) else: file_id = self._client.get_file_id(self.space_name, file_path=remote_path) if source_file: with open(source_file, "rb") as src: Loading @@ -241,14 +244,18 @@ class OnedataObjectStore(CachingConcreteObjectStore): if not chunk: break self._client.put_file_content(self.space_name, file_id, offset, chunk) self._client.put_file_content( self.space_name, data=chunk, offset=offset, file_id=file_id ) offset += len(chunk) else: self._client.put_file_content(self.space_name, file_id, 0, from_string.encode("utf-8")) self._client.put_file_content( self.space_name, data=from_string.encode("utf-8"), file_id=file_id ) end_time = datetime.now() log.debug( "Pushed cache file '%s' under '%s' (%s bytes transfered in %s sec)", "Pushed cache file '%s' under '%s' (%s bytes transferred in %s sec)", source_file, rel_path, os.path.getsize(source_file), Loading @@ -257,25 +264,36 @@ class OnedataObjectStore(CachingConcreteObjectStore): return True else: log.error("Source file does not exist.", rel_path, source_file) except OnedataRESTError: except OnedataError: log.exception("Trouble pushing Onedata key '%s' from file '%s'", rel_path, source_file) raise return False def _delete_existing_remote(self, rel_path) -> bool: try: onedata_path = self._construct_onedata_path(rel_path) self._client.remove(self.space_name, onedata_path) onedata_path = self._build_remote_path(rel_path) self._client.remove(self.space_name, file_path=onedata_path) return True except OnedataRESTError: except OnedataError: log.exception("Could not delete '%s' from Onedata", rel_path) return False def _get_object_url(self, obj, **kwargs): def _get_object_url(self, _obj, **_kwargs): return None def _get_store_usage_percent(self, obj): def _get_store_usage_percent(self, _obj): return 0.0 def shutdown(self): self._shutdown_cache_monitor() def _is_not_found_onedata_rest_error(ex): if isinstance(ex, OnedataRESTError): if ex.http_code == 404: return True if ex.http_code == 400 and ex.category == "posix": return ex.details["errno"] == "enoent" return False Loading
lib/galaxy/objectstore/onedata.py +53 −35 Original line number Diff line number Diff line Loading @@ -7,8 +7,9 @@ import os from datetime import datetime try: from onedatafilerestclient import ( OnedataFileRESTClient, from onedatafilerestclient import OnedataFileRESTClient from onedatafilerestclient.errors import ( OnedataError, OnedataRESTError, ) except ImportError: Loading Loading @@ -131,7 +132,7 @@ class OnedataObjectStore(CachingConcreteObjectStore): self._start_cache_monitor_if_needed() @classmethod def parse_xml(clazz, config_xml): def parse_xml(cls, config_xml): return _parse_config_xml(config_xml) def to_dict(self): Loading @@ -155,28 +156,26 @@ class OnedataObjectStore(CachingConcreteObjectStore): ) return as_dict def _construct_onedata_path(self, rel_path): def _build_remote_path(self, rel_path): return os.path.join(self.galaxy_root_dir, rel_path) def _get_remote_size(self, rel_path): try: onedata_path = self._construct_onedata_path(rel_path) return self._client.get_attributes(self.space_name, file_path=onedata_path)["size"] except OnedataRESTError: remote_path = self._build_remote_path(rel_path) return self._client.get_attributes(self.space_name, attributes=["size"], file_path=remote_path)["size"] except OnedataError: log.exception("Could not get '%s' size from Onedata", rel_path) return -1 def _exists_remotely(self, rel_path): try: onedata_path = self._construct_onedata_path(rel_path) self._client.get_attributes(self.space_name, file_path=onedata_path) remote_path = self._build_remote_path(rel_path) self._client.get_attributes(self.space_name, attributes=["type", "size"], file_path=remote_path) return True except OnedataRESTError as ex: if ex.http_code == 404: return False elif ex.http_code == 400 and ex.error_details.get("errno", None) == "enoent": except OnedataError as ex: if _is_not_found_onedata_rest_error(ex): return False else: log.exception("Trouble checking '%s' existence in Onedata", rel_path) return False Loading @@ -186,21 +185,23 @@ class OnedataObjectStore(CachingConcreteObjectStore): log.debug("Pulling file '%s' into cache to %s", rel_path, dst_path) onedata_path = self._construct_onedata_path(rel_path) file_size = self._client.get_attributes(self.space_name, file_path=onedata_path)["size"] remote_path = self._build_remote_path(rel_path) file_size = self._client.get_attributes(self.space_name, attributes=["size"], file_path=remote_path)["size"] # Test if cache is large enough to hold the new file if not self._caching_allowed(rel_path, file_size): return False with open(dst_path, "wb") as dst: for chunk in self._client.iter_file_content(self.space_name, STREAM_CHUNK_SIZE, file_path=onedata_path): for chunk in self._client.iter_file_content( self.space_name, chunk_size=STREAM_CHUNK_SIZE, file_path=remote_path ): dst.write(chunk) log.debug("Pulled '%s' into cache to %s", rel_path, dst_path) return True except OnedataRESTError: except OnedataError: log.exception("Problem downloading file '%s'", rel_path) return False Loading @@ -211,7 +212,7 @@ class OnedataObjectStore(CachingConcreteObjectStore): ``rel_path`` as the path. """ try: source_file = source_file if source_file else self._get_cache_path(rel_path) source_file = source_file or self._get_cache_path(rel_path) if os.path.exists(source_file): if os.path.getsize(source_file) == 0 and self._exists_remotely(rel_path): log.debug( Loading @@ -226,12 +227,14 @@ class OnedataObjectStore(CachingConcreteObjectStore): os.path.getsize(source_file), rel_path, ) onedata_path = self._construct_onedata_path(rel_path) remote_path = self._build_remote_path(rel_path) if not self._exists_remotely(rel_path): self._client.create_file(self.space_name, onedata_path, "REG", create_parents=True) file_id = self._client.get_file_id(self.space_name, onedata_path) file_id = self._client.create_file( self.space_name, file_path=remote_path, file_type="REG", create_parents=True ) else: file_id = self._client.get_file_id(self.space_name, file_path=remote_path) if source_file: with open(source_file, "rb") as src: Loading @@ -241,14 +244,18 @@ class OnedataObjectStore(CachingConcreteObjectStore): if not chunk: break self._client.put_file_content(self.space_name, file_id, offset, chunk) self._client.put_file_content( self.space_name, data=chunk, offset=offset, file_id=file_id ) offset += len(chunk) else: self._client.put_file_content(self.space_name, file_id, 0, from_string.encode("utf-8")) self._client.put_file_content( self.space_name, data=from_string.encode("utf-8"), file_id=file_id ) end_time = datetime.now() log.debug( "Pushed cache file '%s' under '%s' (%s bytes transfered in %s sec)", "Pushed cache file '%s' under '%s' (%s bytes transferred in %s sec)", source_file, rel_path, os.path.getsize(source_file), Loading @@ -257,25 +264,36 @@ class OnedataObjectStore(CachingConcreteObjectStore): return True else: log.error("Source file does not exist.", rel_path, source_file) except OnedataRESTError: except OnedataError: log.exception("Trouble pushing Onedata key '%s' from file '%s'", rel_path, source_file) raise return False def _delete_existing_remote(self, rel_path) -> bool: try: onedata_path = self._construct_onedata_path(rel_path) self._client.remove(self.space_name, onedata_path) onedata_path = self._build_remote_path(rel_path) self._client.remove(self.space_name, file_path=onedata_path) return True except OnedataRESTError: except OnedataError: log.exception("Could not delete '%s' from Onedata", rel_path) return False def _get_object_url(self, obj, **kwargs): def _get_object_url(self, _obj, **_kwargs): return None def _get_store_usage_percent(self, obj): def _get_store_usage_percent(self, _obj): return 0.0 def shutdown(self): self._shutdown_cache_monitor() def _is_not_found_onedata_rest_error(ex): if isinstance(ex, OnedataRESTError): if ex.http_code == 404: return True if ex.http_code == 400 and ex.category == "posix": return ex.details["errno"] == "enoent" return False