Commit e6ff015d authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

add delete method to rdb

parent be5fd731
Loading
Loading
Loading
Loading
Loading
+32 −12
Original line number Diff line number Diff line
@@ -27,6 +27,19 @@ class Default(protocol.RSEProtocol):
        """ Closes the connection to RSE."""
        pass

    def _prepare_url_request(self,pfn, command):
        path = self.pfn2path(pfn)
        query = ""
        if self.attributes.get("extended_attributes", None):
            as_user = self.attributes["extended_attributes"].get("as_user", False)
            remote_queue = self.attributes["extended_attributes"].get("remote_queue", None)
            if as_user:
                query = "?asuser=true"
            if remote_queue:
                query = "?remotequeue=" + remote_queue
        return f"{self.attributes['scheme']}://{self.attributes['hostname']}:{self.attributes['port']}/v0.1/{command}{query}", \
               {"filename": path, "token": self.auth_token or ""}

    def get(self, pfn, dest, transfer_timeout=None):
        """ Provides access to files stored inside connected the RSE.

@@ -39,18 +52,8 @@ class Default(protocol.RSEProtocol):
            :raises SourceNotFound: if the source file was not found on the referred storage.
         """
        try:
            path = self.pfn2path(pfn)
            query = ""
            if self.attributes.get("extended_attributes", None):
                as_user = self.attributes["extended_attributes"].get("as_user", False)
                remote_queue = self.attributes["extended_attributes"].get("remote_queue", None)
                if as_user:
                    query = "?asuser=true"
                if remote_queue:
                    query = "?remotequeue="+remote_queue

            with requests.post(f"{self.attributes['scheme']}://{self.attributes['hostname']}:{self.attributes['port']}/v0.1/download{query}",
                          json={"filename": path, "token": self.auth_token or ""}, stream=True) as res:
            url, data = self._prepare_url_request(pfn,"download")
            with requests.post(url, json=data, stream=True) as res:
                res.raise_for_status()
                with open(dest, 'wb') as f:
                    shutil.copyfileobj(res.raw, f)
@@ -59,6 +62,23 @@ class Default(protocol.RSEProtocol):
                os.remove(dest)
            raise exception.ServiceUnavailable(e)

    def delete(self, pfn):
        """ Deletes a file from the connected RSE.
            :param pfn: pfn to the to be deleted file
            :raises ServiceUnavailable: if some generic error occured in the library.
            :raises SourceNotFound: if the source file was not found on the referred storage.
        """
        try:
            url, data = self._prepare_url_request(pfn, "delete")
            with requests.post(url, json=data) as res:
                res.raise_for_status()
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404 and "file not found" in e.response.text:
                raise exception.SourceNotFound(e)
            else:
                raise exception.ServiceUnavailable(e)
        except Exception as e:
            raise exception.ServiceUnavailable(e)

    def pfn2path(self, pfn):
        parsed = urlparse(pfn)