Commit 1026bf97 authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

Merge branch '51-register-file-in-rucio' into 'dev'

add option to ingest files to rucio object store

Closes #51

See merge request !41
parents 00c0591c 89d492b3
Loading
Loading
Loading
Loading
Loading
+206 −19
Original line number Diff line number Diff line
import copy
import time

from ..objectstore import ConcreteObjectStore

import os

import logging

log = logging.getLogger(__name__)

from galaxy.util import (
    directory_hash_id,
    umask_fix_perms,
    unlink,
    unlink, string_as_bool,
)
from galaxy.exceptions import (
    ObjectInvalid,
@@ -17,14 +18,160 @@ from galaxy.exceptions import (
)
from galaxy.util.path import safe_relpath

from rucio.client.downloadclient import DownloadClient
from rucio.common import utils
from rucio.client import Client
from rucio.client.uploadclient import UploadClient
from rucio.client.downloadclient import DownloadClient
from rucio.common.exception import (RSEWriteBlocked, NoFilesUploaded, NotAllFilesUploaded,
                                    RSENotFound, RSEProtocolNotSupported, InputValidationError)  # type: ignore
from rucio.common.utils import (generate_uuid)
from rucio.rse import rsemanager as rsemgr
import rucio.common


import shutil

log = logging.getLogger(__name__)


class InPlaceIngestClient(UploadClient):
    def ingest(self, items, summary_file_path=None, traces_copy_out=None, ignore_availability=False, activity=None):
        """
        :param items: List of dictionaries. Each dictionary describing a file to upload. Keys:
            path                  - path of the file that will be uploaded
            rse                   - rse expression/name (e.g. 'CERN-PROD_DATADISK') where to upload the file
            did_scope             - Optional: custom did scope (Default: user.<account>)
            did_name              - Optional: custom did name (Default: name of the file)
            dataset_scope         - Optional: custom dataset scope
            dataset_name          - Optional: custom dataset name
            dataset_meta          - Optional: custom metadata for dataset
            impl                  - Optional: name of the protocol implementation to be used to upload this item.
            force_scheme          - Optional: force a specific scheme (if PFN upload this will be overwritten) (Default: None)
            pfn                   - Optional: use a given PFN (this sets no_register to True, and no_register becomes mandatory)
            no_register           - Optional: if True, the file will not be registered in the rucio catalogue
            register_after_upload - Optional: if True, the file will be registered after successful upload
            lifetime              - Optional: the lifetime of the file after it was uploaded
            transfer_timeout      - Optional: time after the upload will be aborted
            guid                  - Optional: guid of the file
            recursive             - Optional: if set, parses the folder structure recursively into collections
        :param summary_file_path: Optional: a path where a summary in form of a json file will be stored
        :param traces_copy_out: reference to an external list, where the traces should be uploaded
        :param ignore_availability: ignore the availability of a RSE
        :param activity: the activity set to the rule if no dataset is specified

        :returns: 0 on success

        :raises InputValidationError: if any input arguments are in a wrong format
        :raises RSEWriteBlocked: if a given RSE is not available for writing
        :raises NoFilesUploaded: if no files were successfully uploaded
        :raises NotAllFilesUploaded: if not all files were successfully uploaded
        """
        # helper to get rse from rse_expression:

        logger = self.logger
        self.trace['uuid'] = generate_uuid()

        # check given sources, resolve dirs into files, and collect meta infos
        files = self._collect_and_validate_file_info(items)
        logger(logging.DEBUG, 'Num. of files that upload client is processing: {}'.format(len(files)))

        # check if RSE of every file is available for writing
        # and cache rse settings
        registered_dataset_dids = set()
        registered_file_dids = set()
        rse_expression = None
        for file in files:
            rse = file['rse']
            if not self.rses.get(rse):
                rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo))
                if not ignore_availability and rse_settings['availability_write'] != 1:
                    raise RSEWriteBlocked('%s is not available for writing. No actions have been taken' % rse)

            dataset_scope = file.get('dataset_scope')
            dataset_name = file.get('dataset_name')
            file['rse'] = rse
            if dataset_scope and dataset_name:
                dataset_did_str = ('%s:%s' % (dataset_scope, dataset_name))
                file['dataset_did_str'] = dataset_did_str
                registered_dataset_dids.add(dataset_did_str)

            registered_file_dids.add('%s:%s' % (file['did_scope'], file['did_name']))
        wrong_dids = registered_file_dids.intersection(registered_dataset_dids)
        if len(wrong_dids):
            raise InputValidationError('DIDs used to address both files and datasets: %s' % str(wrong_dids))
        logger(logging.DEBUG, 'Input validation done.')

        # clear this set again to ensure that we only try to register datasets once
        registered_dataset_dids = set()
        num_succeeded = 0
        summary = []
        for file in files:
            basename = file['basename']
            logger(logging.INFO, 'Preparing upload for file %s' % basename)

            pfn = file.get('pfn')
            force_scheme = file.get('force_scheme')
            impl = file.get('impl')

            trace = copy.deepcopy(self.trace)
            # appending trace to list reference, if the reference exists
            if traces_copy_out is not None:
                traces_copy_out.append(trace)

            rse = file['rse']
            trace['scope'] = file['did_scope']
            trace['datasetScope'] = file.get('dataset_scope', '')
            trace['dataset'] = file.get('dataset_name', '')
            trace['remoteSite'] = rse
            trace['filesize'] = file['bytes']

            file_did = {'scope': file['did_scope'], 'name': file['did_name']}
            dataset_did_str = file.get('dataset_did_str')
            rse_settings = self.rses[rse]
            is_deterministic = rse_settings.get('deterministic', True)
            if not is_deterministic and not pfn:
                logger(logging.ERROR, 'PFN has to be defined for NON-DETERMINISTIC RSE.')
                continue
            if pfn and is_deterministic:
                logger(logging.WARNING,
                       'Upload with given pfn implies that no_register is True, except non-deterministic RSEs')
                no_register = True

            self._register_file(file, registered_dataset_dids, ignore_availability=ignore_availability,
                                activity=activity)

            file['upload_result'] = {0: True, 1: None, 'success': True, 'pfn': pfn}  # needs to be removed
            num_succeeded += 1
            trace['transferStart'] = time.time()
            trace['transferEnd'] = time.time()
            trace['clientState'] = 'DONE'
            file['state'] = 'A'
            logger(logging.INFO, 'Successfully uploaded file %s' % basename)
            self._send_trace(trace)

            if summary_file_path:
                summary.append(copy.deepcopy(file))

            replica_for_api = self._convert_file_for_api(file)
            try:
                self.client.update_replicas_states(rse, files=[replica_for_api])
            except Exception as error:
                logger(logging.ERROR, 'Failed to update replica state for file {}'.format(basename))
                logger(logging.DEBUG, 'Details: {}'.format(str(error)))

            # add file to dataset if needed
            if dataset_did_str and not no_register:
                try:
                    self.client.attach_dids(file['dataset_scope'], file['dataset_name'], [file_did])
                except Exception as error:
                    logger(logging.WARNING, 'Failed to attach file to the dataset')
                    logger(logging.DEBUG, 'Attaching to dataset {}'.format(str(error)))

        if num_succeeded == 0:
            raise NoFilesUploaded()
        elif num_succeeded != len(files):
            raise NotAllFilesUploaded()
        return 0


def _config_xml_error(tag):
    msg = f"No {tag} element in config XML tree"
@@ -55,10 +202,12 @@ def parse_config_xml(config_xml):
            rucio_preferred_rse_name = e_xml[0].get("preferred_rse_name", None)
            rucio_preferred_rse_protocol = e_xml[0].get("preferred_rse_protocol", None)
            rucio_scope = e_xml[0].get("scope", None)
            rucio_register_only = string_as_bool(e_xml[0].get("register_only", "False"))
        else:
            rucio_preferred_rse_name = None
            rucio_preferred_rse_protocol = None
            rucio_scope = None
            rucio_register_only = False

        return {
            "cache": {
@@ -69,6 +218,7 @@ def parse_config_xml(config_xml):
            "rucio_preferred_rse_name": rucio_preferred_rse_name,
            "rucio_preferred_rse_protocol": rucio_preferred_rse_protocol,
            "rucio_scope": rucio_scope,
            "rucio_register_only": rucio_register_only,
        }
    except Exception:
        # Toss it back up after logging, we can't continue loading at this point.
@@ -77,15 +227,29 @@ def parse_config_xml(config_xml):


class RucioBroker():
    def __init__(self, rse_name, rse_protocol, scope):
        self.rse_name = rse_name
        self.rse_protocol = rse_protocol
        self.scope = scope
    def __init__(self, rucio_config):
        self.rse_name = rucio_config["rucio_preferred_rse_name"]
        self.rse_protocol = rucio_config["rucio_preferred_rse_protocol"]
        self.scope = rucio_config["rucio_scope"]
        self.register_only = rucio_config["rucio_register_only"]
        rucio.common.utils.PREFERRED_CHECKSUM = "md5"
        # rucio config is in a system rucio.cfg file
        self.rucio_client = Client()
        self.upload_client = UploadClient(_client=self.rucio_client)
        self.download_client = DownloadClient(client=self.rucio_client)
        self.ingest_client = InPlaceIngestClient(_client=self.rucio_client)

    def register(self, key, source_path):
        key = os.path.basename(key)
        item = {
            "path": source_path,
            "rse": self.rse_name,
            "did_scope": self.scope,
            "did_name": key,
            "pfn": f"file://localhost/{source_path}",
        }
        items = [item]
        self.ingest_client.ingest(items)

    def upload(self, key, source_path):
        key = os.path.basename(key)
@@ -165,9 +329,7 @@ class RucioObjectStore(ConcreteObjectStore):

    def to_dict(self):
        rval = super().to_dict()
        rval["rucio_preferred_rse_name"] = self.rucio_preferred_rse_name
        rval["rucio_preferred_rse_protocol"] = self.rucio_preferred_rse_protocol
        rval["rucio_scope"] = self.rucio_scope
        rval.update(self.rucio_config)
        rval["cache"] = dict()
        rval["cache"]["size"] = self.cache_size
        rval["cache"]["path"] = self.staging_path
@@ -175,12 +337,20 @@ class RucioObjectStore(ConcreteObjectStore):

    def __init__(self, config, config_dict):
        super().__init__(config, config_dict)

        self.rucio_preferred_rse_name = config_dict.get("rucio_preferred_rse_name", None)
        self.rucio_preferred_rse_protocol = config_dict.get("rucio_preferred_rse_protocol", None)
        self.rucio_scope = config_dict.get("rucio_scope", None)

        self.rucio_broker = RucioBroker(self.rucio_preferred_rse_name,self.rucio_preferred_rse_protocol,self.rucio_scope)
        self.rucio_config = {}
        self.rucio_config["rucio_preferred_rse_name"] = config_dict.get("rucio_preferred_rse_name", None)
        self.rucio_config["rucio_preferred_rse_protocol"] = config_dict.get("rucio_preferred_rse_protocol", None)
        self.rucio_config["rucio_register_only"] = config_dict.get("rucio_register_only", False)
        self.rucio_config["rucio_scope"] = config_dict.get("rucio_scope", None)

        if 'RUCIO_PREFERRED_RSE_NAME' in os.environ:
            self.rucio_config["rucio_preferred_rse_name"] = os.environ['RUCIO_PREFERRED_RSE_NAME']
        if 'RUCIO_PREFERRED_RSE_PROTOCOL' in os.environ:
            self.rucio_config["rucio_preferred_rse_protocol"] = os.environ['RUCIO_PREFERRED_RSE_PROTOCOL']
        if 'RUCIO_REGISTER_ONLY' in os.environ:
            self.rucio_config["rucio_register_only"] = string_as_bool(os.environ['RUCIO_REGISTER_ONLY'])

        self.rucio_broker = RucioBroker(self.rucio_config)
        cache_dict = config_dict["cache"]
        if cache_dict is None:
            _config_dict_error("cache")
@@ -457,11 +627,28 @@ class RucioObjectStore(ConcreteObjectStore):

        return self._get_cache_path(rel_path)

    def _register_file(self, rel_path, file_name):
        if file_name is None:
            file_name = self._get_cache_path(rel_path)
            if not os.path.islink(file_name):
                raise ObjectInvalid(
                    f"rucio objectstore._register_file, rucio_register_only "
                    f"is set, but file in cache is not a link ")
        if os.path.islink(file_name):
            file_name = os.readlink(file_name)
        self.rucio_broker.register(rel_path, file_name)
        log.debug("rucio _register_file: " + file_name)
        return

    def _update_from_file(self, obj, file_name=None, create=False, **kwargs):
        if not create:
            raise ObjectNotFound(f"rucio objectstore.update_from_file, file update not allowed")

        rel_path = self._construct_path(obj, **kwargs)

        if self.rucio_config["rucio_register_only"]:
            self._register_file(rel_path, file_name)
            return

        log.debug("rucio _update_from_file:" + rel_path)

        # Choose whether to use the dataset file itself or an alternate file