Loading rucio_protocols/posix.py→rucio_protocols/rdb.py +124 −0 Original line number Diff line number Diff line # -*- coding: utf-8 -*- # Copyright European Organization for Nuclear Research (CERN) since 2012 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os import os.path Loading @@ -25,7 +10,7 @@ from rucio.rse.protocols import protocol class Default(protocol.RSEProtocol): """ Implementing access to RSEs using the local filesystem.""" """ Implementing access to RSEs using the remote data broker.""" def exists(self, pfn): """ Loading @@ -37,12 +22,7 @@ class Default(protocol.RSEProtocol): :raises SourceNotFound: if the source file was not found on the referred storage. """ status = '' try: status = os.path.exists(self.pfn2path(pfn)) except Exception as e: raise exception.ServiceUnavailable(e) return status return True def connect(self): """ Loading Loading @@ -99,67 +79,7 @@ class Default(protocol.RSEProtocol): :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ target = self.pfn2path(target) if source_dir: sf = source_dir + '/' + source else: sf = source try: dirs = os.path.dirname(target) if not os.path.exists(dirs): os.makedirs(dirs) shutil.copy(sf, target) except IOError as e: if e.errno == 2: raise exception.SourceNotFound(e) elif not self.exists(self.rse['prefix']): path = '' for p in self.rse['prefix'].split('/'): path += p + '/' os.mkdir(path) shutil.copy(sf, self.pfn2path(target)) else: raise exception.DestinationNotAccessible(e) def delete(self, pfn): """ Deletes a file from the connected RSE. :param pfn: pfn to the to be deleted file :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ try: os.remove(self.pfn2path(pfn)) except OSError as e: if e.errno == 2: raise exception.SourceNotFound(e) def rename(self, pfn, new_pfn): """ Allows to rename a file stored inside the connected RSE. :param path: path to the current file on the storage :param new_path: path to the new file on the storage :raises DestinationNotAccessible: if the destination storage was not accessible. :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ path = self.pfn2path(pfn) new_path = self.pfn2path(new_pfn) try: if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) os.rename(path, new_path) except IOError as e: if e.errno == 2: if self.exists(self.pfn2path(path)): raise exception.SourceNotFound(e) else: raise exception.DestinationNotAccessible(e) else: raise exception.ServiceUnavailable(e) pass def lfns2pfns(self, lfns): """ Returns fully qualified PFNs for the file referred by each lfn in Loading Loading @@ -202,50 +122,3 @@ class Default(protocol.RSEProtocol): tmp = list(self.parse_pfns(pfn).values())[0] return '/'.join([tmp['prefix'], tmp['path'], tmp['name']]) def stat(self, pfn): """ Determines the file size in bytes and checksum (adler32) of the provided file. :param pfn: The PFN the file. :returns: a dict containing the keys filesize and adler32. """ path = self.pfn2path(pfn) return {'filesize': os.stat(path)[os.path.stat.ST_SIZE], 'adler32': adler32(path)} class Symlink(Default): """ Implementing access to RSEs using the local filesystem, creating a symlink on a get """ def get(self, pfn, dest, transfer_timeout=None): """ Provides access to files stored inside connected the RSE. A download/get will create a symlink on the local file system pointing to the underlying file. Other operations act directly on the remote file. :param pfn: Physical file name of requested file :param dest: Name and path of the files when stored at the client :param transfer_timeout Transfer timeout (in seconds) - dummy :raises DestinationNotAccessible: if the destination storage was not accessible. :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ path = self.pfn2path(pfn) os.symlink(path, dest) self.logger(logging.DEBUG, 'Symlink {} created for {} from {}' .format(dest, path, pfn)) if not os.lstat(dest): # problem in creating the symlink self.logger(logging.ERROR, 'Symlink {} could not be created'.format(dest)) raise exception.DestinationNotAccessible() if not os.path.exists(dest): # could not find the file following the symlink self.logger(logging.ERROR, 'Symlink {} appears to be a broken link to {}' .format(dest, path)) if os.lstat(dest) and os.path.islink(dest): os.unlink(dest) raise exception.SourceNotFound() def pfn2path(self, pfn): # obtain path and sanitise from multiple slashes, etc path = os.path.normpath(super().pfn2path(pfn)) self.logger(logging.DEBUG, 'Extracted path: {} from: {}'.format(path, pfn)) return path Loading
rucio_protocols/posix.py→rucio_protocols/rdb.py +124 −0 Original line number Diff line number Diff line # -*- coding: utf-8 -*- # Copyright European Organization for Nuclear Research (CERN) since 2012 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os import os.path Loading @@ -25,7 +10,7 @@ from rucio.rse.protocols import protocol class Default(protocol.RSEProtocol): """ Implementing access to RSEs using the local filesystem.""" """ Implementing access to RSEs using the remote data broker.""" def exists(self, pfn): """ Loading @@ -37,12 +22,7 @@ class Default(protocol.RSEProtocol): :raises SourceNotFound: if the source file was not found on the referred storage. """ status = '' try: status = os.path.exists(self.pfn2path(pfn)) except Exception as e: raise exception.ServiceUnavailable(e) return status return True def connect(self): """ Loading Loading @@ -99,67 +79,7 @@ class Default(protocol.RSEProtocol): :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ target = self.pfn2path(target) if source_dir: sf = source_dir + '/' + source else: sf = source try: dirs = os.path.dirname(target) if not os.path.exists(dirs): os.makedirs(dirs) shutil.copy(sf, target) except IOError as e: if e.errno == 2: raise exception.SourceNotFound(e) elif not self.exists(self.rse['prefix']): path = '' for p in self.rse['prefix'].split('/'): path += p + '/' os.mkdir(path) shutil.copy(sf, self.pfn2path(target)) else: raise exception.DestinationNotAccessible(e) def delete(self, pfn): """ Deletes a file from the connected RSE. :param pfn: pfn to the to be deleted file :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ try: os.remove(self.pfn2path(pfn)) except OSError as e: if e.errno == 2: raise exception.SourceNotFound(e) def rename(self, pfn, new_pfn): """ Allows to rename a file stored inside the connected RSE. :param path: path to the current file on the storage :param new_path: path to the new file on the storage :raises DestinationNotAccessible: if the destination storage was not accessible. :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ path = self.pfn2path(pfn) new_path = self.pfn2path(new_pfn) try: if not os.path.exists(os.path.dirname(new_path)): os.makedirs(os.path.dirname(new_path)) os.rename(path, new_path) except IOError as e: if e.errno == 2: if self.exists(self.pfn2path(path)): raise exception.SourceNotFound(e) else: raise exception.DestinationNotAccessible(e) else: raise exception.ServiceUnavailable(e) pass def lfns2pfns(self, lfns): """ Returns fully qualified PFNs for the file referred by each lfn in Loading Loading @@ -202,50 +122,3 @@ class Default(protocol.RSEProtocol): tmp = list(self.parse_pfns(pfn).values())[0] return '/'.join([tmp['prefix'], tmp['path'], tmp['name']]) def stat(self, pfn): """ Determines the file size in bytes and checksum (adler32) of the provided file. :param pfn: The PFN the file. :returns: a dict containing the keys filesize and adler32. """ path = self.pfn2path(pfn) return {'filesize': os.stat(path)[os.path.stat.ST_SIZE], 'adler32': adler32(path)} class Symlink(Default): """ Implementing access to RSEs using the local filesystem, creating a symlink on a get """ def get(self, pfn, dest, transfer_timeout=None): """ Provides access to files stored inside connected the RSE. A download/get will create a symlink on the local file system pointing to the underlying file. Other operations act directly on the remote file. :param pfn: Physical file name of requested file :param dest: Name and path of the files when stored at the client :param transfer_timeout Transfer timeout (in seconds) - dummy :raises DestinationNotAccessible: if the destination storage was not accessible. :raises ServiceUnavailable: if some generic error occured in the library. :raises SourceNotFound: if the source file was not found on the referred storage. """ path = self.pfn2path(pfn) os.symlink(path, dest) self.logger(logging.DEBUG, 'Symlink {} created for {} from {}' .format(dest, path, pfn)) if not os.lstat(dest): # problem in creating the symlink self.logger(logging.ERROR, 'Symlink {} could not be created'.format(dest)) raise exception.DestinationNotAccessible() if not os.path.exists(dest): # could not find the file following the symlink self.logger(logging.ERROR, 'Symlink {} appears to be a broken link to {}' .format(dest, path)) if os.lstat(dest) and os.path.islink(dest): os.unlink(dest) raise exception.SourceNotFound() def pfn2path(self, pfn): # obtain path and sanitise from multiple slashes, etc path = os.path.normpath(super().pfn2path(pfn)) self.logger(logging.DEBUG, 'Extracted path: {} from: {}'.format(path, pfn)) return path