Commit 6fdc2efa authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

modify objectstore and rucio plugin to delete datasets

parent f2c0f721
Loading
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -75,7 +75,7 @@ class DatasetManager(base.ModelManager[model.Dataset], secured.AccessibleManager
    def copy(self, dataset, **kwargs):
        raise exceptions.NotImplemented("Datasets cannot be copied")

    def purge(self, dataset, flush=True):
    def purge(self, dataset, flush=True, user=None):
        """
        Remove the object_store/file for this dataset from storage and mark
        as purged.
@@ -85,7 +85,7 @@ class DatasetManager(base.ModelManager[model.Dataset], secured.AccessibleManager
        self.error_unless_dataset_purge_allowed(dataset)

        # the following also marks dataset as purged and deleted
        dataset.full_delete()
        dataset.full_delete(user=user)
        self.session().add(dataset)
        if flush:
            session = self.session()
@@ -365,7 +365,7 @@ class DatasetAssociationManager(

        # more importantly, purge underlying dataset as well
        if dataset_assoc.dataset.user_can_purge:
            self.dataset_manager.purge(dataset_assoc.dataset)
            self.dataset_manager.purge(dataset_assoc.dataset, user=dataset_assoc.user)
        return dataset_assoc

    def by_user(self, user):
+2 −2
Original line number Diff line number Diff line
@@ -4145,11 +4145,11 @@ class Dataset(Base, StorableObject, Serializable):
            and len(self.history_associations) == len(self.purged_history_associations)
        )

    def full_delete(self):
    def full_delete(self, user):
        """Remove the file and extra files, marks deleted and purged"""
        # os.unlink( self.file_name )
        try:
            self.object_store.delete(self)
            self.object_store.delete(self, user=user)
        except galaxy.exceptions.ObjectNotFound:
            pass
        rel_path = self._extra_files_rel_path
+67 −9
Original line number Diff line number Diff line
@@ -41,11 +41,52 @@ from ..objectstore import ConcreteObjectStore

log = logging.getLogger(__name__)

class DeleteClient(UploadClient):
    def delete(self, items, forced_schemes=None, ignore_availability=False):
        for item in items:
            self._delete_item(item, forced_schemes, ignore_availability)

    def _delete_item(self, item, forced_schemes, ignore_availability):
        logger = self.logger
        dids = [item["did"]]
        files = list(next(self.client.list_replicas(dids))["pfns"].items())
        for file in files:
            pfn = file[0]
            rse = file[1]["rse"]
            force_scheme = None
            for rse_scheme in forced_schemes or []:
                if rse == rse_scheme["rse"]:
                    force_scheme = rse_scheme["scheme"]
            if not self.rses.get(rse):
                rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo))
                if not ignore_availability and rse_settings['availability_delete'] != 1:
                    logger(logging.DEBUG, '%s is not available for deletion. No actions have been taken' % rse)
                    continue

            # protocol handling and deletion
            rse_settings = self.rses[rse]
            protocols = rsemgr.get_protocols_ordered(rse_settings=rse_settings, operation='delete',
                                                     scheme=force_scheme)
            protocols.reverse()
            success = False
            while not success and len(protocols):
                protocol = protocols.pop()
                cur_scheme = protocol['scheme']
                try:
                    protocol_delete = self._create_protocol(rse_settings, 'delete', force_scheme=cur_scheme)
                    protocol_delete.delete(pfn)
                    success = True
                except Exception as error:
                    logger(logging.WARNING, 'Delete attempt failed')
                    logger(logging.INFO, 'Exception: %s' % str(error), exc_info=True)
            logger(logging.DEBUG, 'Successfully deleted dataset %s' % pfn)

class InPlaceIngestClient(UploadClient):

    def ingest(self, items, summary_file_path=None, traces_copy_out=None, ignore_availability=False, activity=None):
        """
        :param items: List of dictionaries. Each dictionary describing a file to upload. Keys:
            path                  - path of the file that will be uploaded
            path                  - path of the file that will be uploaded
            rse                   - rse expression/name (e.g. 'CERN-PROD_DATADISK') where to upload the file
            did_scope             - Optional: custom did scope (Default: user.<account>)
@@ -280,6 +321,12 @@ class RucioBroker:
        ic.auth_token = auth_token
        return ic

    def get_rucio_delete_client(self, auth_token=None):
        client = self.get_rucio_client()
        ic = DeleteClient(_client=client)
        ic.auth_token = auth_token
        return ic

    def register(self, key, source_path):
        key = os.path.basename(key)
        item = {
@@ -354,13 +401,17 @@ class RucioBroker:
        except:
            return 0

    def delete(self, key):
        rucio_client = self.get_rucio_client()
    def delete(self, key, auth_token):
        key = os.path.basename(key)
        dids = [{"scope": self.scope, "name": key}]
        rses = next(rucio_client.list_replicas(dids))["rses"].keys()
        for rse in rses:
            rucio_client.delete_replicas(rse, dids)
        try:
            items = [
                {"did": {"scope": self.scope, "name": key}}
            ]
            self.get_rucio_delete_client(auth_token=auth_token).delete(items, self.download_schemes, True)
        except Exception as e:
            log.exception("Cannot delete file:" + str(e))
            return False
        return True


class RucioObjectStore(ConcreteObjectStore):
@@ -595,6 +646,7 @@ class RucioObjectStore(ConcreteObjectStore):
        dir_only = kwargs.get("dir_only", False)
        obj_dir = kwargs.get("obj_dir", False)
        log.debug("rucio _delete: " + rel_path)
        auth_token = self._get_token(**kwargs)

        try:
            # Remove temporary data in JOB_WORK directory
@@ -610,7 +662,7 @@ class RucioObjectStore(ConcreteObjectStore):

            # Delete from rucio as well
            if self.rucio_broker.data_object_exists(rel_path):
                self.rucio_broker.delete(rel_path)
                self.rucio_broker.delete(rel_path, auth_token)
                return True
        except OSError:
            log.exception("%s delete error", self._get_filename(obj, **kwargs))
@@ -634,10 +686,16 @@ class RucioObjectStore(ConcreteObjectStore):
        auth_token = kwargs.get("auth_token", None)
        if auth_token:
            return auth_token

        arg_user = kwargs.get("user", None)
        try:
            if not arg_user:
                trans = kwargs.get("trans", None)
                user = trans.user
            else:
                user = arg_user
            backend = provider_name_to_backend(self.oidc_provider)
            tokens = trans.user.get_oidc_tokens(backend)
            tokens = user.get_oidc_tokens(backend)
            return tokens["id"]
        except Exception as e:
            log.debug("Failed to get auth token: %s", e)