Commit 3424f704 authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

add protocol for remote data broker

parent cfb55f28
Loading
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
.idea
dist
__pycache__
 No newline at end of file
+15 −81
Original line number Diff line number Diff line
@@ -4,26 +4,15 @@ import os.path
import shutil
from subprocess import call

import requests
from rucio.common import exception
from rucio.common.utils import adler32
from rucio.rse.protocols import protocol

from urllib.parse import urlparse

class Default(protocol.RSEProtocol):
    """ Implementing access to RSEs using the remote data broker."""

    def exists(self, pfn):
        """
            Checks if the requested file is known by the referred RSE.

            :param pfn: Physical file name

            :returns: True if the file exists, False if it doesn't

            :raises SourceNotFound: if the source file was not found on the referred storage.
        """
        return True

    def connect(self):
        """
            Establishes the actual connection to the referred RSE.
@@ -50,75 +39,20 @@ class Default(protocol.RSEProtocol):
            :raises SourceNotFound: if the source file was not found on the referred storage.
         """
        try:
            shutil.copy(self.pfn2path(pfn), dest)
        except IOError as e:
            try:  # To check if the error happend local or remote
                with open(dest, 'wb'):
                    pass
                call(['rm', '-rf', dest])
            except IOError as e:
                if e.errno == 2:
                    raise exception.DestinationNotAccessible(e)
                else:
                    raise exception.ServiceUnavailable(e)
            if e.errno == 2:
                raise exception.SourceNotFound(e)
            else:
            path = self.pfn2path(pfn)
            with requests.post(f"{self.attributes['scheme']}://{self.attributes['hostname']}:{self.attributes['port']}/download",
                          json={"filename": path, "token": self.auth_token or ""}, stream=True) as res:
                res.raise_for_status()
                with open(dest, 'wb') as f:
                    shutil.copyfileobj(res.raw, f)
        except Exception as e:
            if os.path.exists(dest):
                os.remove(dest)
            raise exception.ServiceUnavailable(e)

    def put(self, source, target, source_dir=None, transfer_timeout=None):
        """
            Allows to store files inside the referred RSE.

            :param source: path to the source file on the client file system
            :param target: path to the destination file on the storage
            :param source_dir: Path where the to be transferred files are stored in the local file system
            :param transfer_timeout Transfer timeout (in seconds) - dummy

            :raises DestinationNotAccessible: if the destination storage was not accessible.
            :raises ServiceUnavailable: if some generic error occured in the library.
            :raises SourceNotFound: if the source file was not found on the referred storage.
        """
        pass

    def lfns2pfns(self, lfns):
        """ Returns fully qualified PFNs for the file referred by each lfn in
            the lfns list.

            :param lfns: List of lfns. If lfn['path'] is present it is used as
                   the path to the file, otherwise the path is constructed
                   deterministically.

            :returns: Fully qualified PFNs.
        """
        pfns = {}
        prefix = self.attributes['prefix']

        if not prefix.startswith('/'):
            prefix = ''.join(['/', prefix])
        if not prefix.endswith('/'):
            prefix = ''.join([prefix, '/'])

        lfns = [lfns] if isinstance(lfns, dict) else lfns
        for lfn in lfns:
            scope, name = str(lfn['scope']), lfn['name']
            if lfn.get('path'):
                pfns['%s:%s' % (scope, name)] = ''.join([self.attributes['scheme'],
                                                         '://',
                                                         self.attributes['hostname'],
                                                         prefix,
                                                         lfn['path'] if not lfn['path'].startswith('/') else lfn['path'][1:]
                                                         ])
            else:
                pfns['%s:%s' % (scope, name)] = ''.join([self.attributes['scheme'],
                                                         '://',
                                                         self.attributes['hostname'],
                                                         prefix,
                                                         self._get_path(scope=scope, name=name)
                                                         ])
        return pfns

    def pfn2path(self, pfn):
        tmp = list(self.parse_pfns(pfn).values())[0]
        return '/'.join([tmp['prefix'], tmp['path'], tmp['name']])
        parsed = urlparse(pfn)
        path = os.path.normpath(parsed.path)
        return path