Commit ed5f7f2c authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Merge remote-tracking branch 'upstream/release_24.1' into 111-update-galaxy-24-1

# Conflicts:
#	client/src/components/History/Content/ContentItem.vue
#	client/src/components/History/Content/Dataset/DatasetActions.vue
#	client/src/components/JobInformation/JobInformation.vue
#	client/src/components/Login/LoginForm.vue
#	client/src/components/Login/LoginIndex.vue
#	client/src/components/User/ExternalIdentities/ExternalLogin.vue
#	client/src/components/providers/JobProvider.js
#	client/src/style/scss/base.scss
#	config/plugins/visualizations/editor/config/editor.xml
#	lib/galaxy/authnz/custos_authnz.py
#	lib/galaxy/authnz/managers.py
#	lib/galaxy/datatypes/protocols.py
#	lib/galaxy/dependencies/__init__.py
#	lib/galaxy/dependencies/pinned-requirements.txt
#	lib/galaxy/managers/hdas.py
#	lib/galaxy/managers/hdcas.py
#	lib/galaxy/managers/interactivetool.py
#	lib/galaxy/managers/jobs.py
#	lib/galaxy/model/__init__.py
#	lib/galaxy/objectstore/__init__.py
#	lib/galaxy/objectstore/rucio.py
#	lib/galaxy/objectstore/rucio_extra_clients.py
#	lib/galaxy/tool_util/data/__init__.py
parent 7112d9c4
Loading
Loading
Loading
Loading
Loading
+193 −2
Original line number Diff line number Diff line
import hashlib
<<<<<<< HEAD
from typing import Optional

from .caching import (
    CacheTarget,
    enable_cache_monitor,
    InProcessCacheMonitor,
    parse_caching_config_dict_from_xml,
)

try:
    from ..authnz.util import provider_name_to_backend
except ImportError:
    provider_name_to_backend = None  # type: ignore[misc,assignment]

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
import logging
import os
import shutil
@@ -17,11 +33,14 @@ try:
except ImportError:
    Client = None

<<<<<<< HEAD
=======
try:
    from galaxy.authnz.util import provider_name_to_backend
except ImportError:
    provider_name_to_backend = None  # type: ignore[assignment, unused-ignore]

>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound,
@@ -32,14 +51,16 @@ from galaxy.util import (
    umask_fix_perms,
    unlink,
)
<<<<<<< HEAD
from galaxy.util.path import safe_relpath
from ..objectstore import ConcreteObjectStore
=======
from ._caching_base import CachingConcreteObjectStore
from .caching import (
    CacheTarget,
    enable_cache_monitor,
    parse_caching_config_dict_from_xml,
)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62

log = logging.getLogger(__name__)

@@ -83,11 +104,15 @@ def parse_config_xml(config_xml):
        if e_xml:
            rucio_download_schemes = [{k: e.get(k) for k in attrs_schemes} for e in e_xml]

<<<<<<< HEAD
        oidc_providers = []
        e_xml = config_xml.findall("oidc_provider")
        if e_xml:
            oidc_providers = [e.text for e in e_xml]

=======
        oidc_provider = config_xml.findtext("oidc_provider", None)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        enable_cache_mon = string_as_bool(config_xml.findtext("enable_cache_monitor", "False"))

        e_xml = config_xml.findall("rucio_upload_scheme")
@@ -101,6 +126,10 @@ def parse_config_xml(config_xml):
            rucio_upload_scheme = None
            rucio_scope = None
            rucio_register_only = False
<<<<<<< HEAD
=======
            oidc_provider = None
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62

        e_xml = config_xml.findall("rucio_auth")
        if not e_xml:
@@ -134,7 +163,11 @@ def parse_config_xml(config_xml):
            "cache": cache_dict,
            "rucio": rucio_dict,
            "extra_dirs": extra_dirs,
<<<<<<< HEAD
            "oidc_providers": oidc_providers,
=======
            "oidc_provider": oidc_provider,
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
            "enable_cache_monitor": enable_cache_mon,
        }
    except Exception:
@@ -162,7 +195,11 @@ class RucioBroker:
            auth_host=self.config["auth_host"],
            account=self.config["account"],
            auth_type=self.config["auth_type"],
<<<<<<< HEAD
            creds={"username": self.config["username"], "password": self.config["password"]},
=======
            creds={"username": self.config["username"], "password": self.config["username"]},
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        )
        return client

@@ -241,8 +278,12 @@ class RucioBroker:
                }
            items = [item]
            download_client = self.get_rucio_download_client(auth_token=auth_token)
<<<<<<< HEAD
            res = download_client.download_dids(items)
            os.replace(res[0]["dest_file_paths"][0], dest_path)
=======
            download_client.download_dids(items)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        except Exception as e:
            log.exception(f"Cannot download file: {str(e)}")
            return False
@@ -277,7 +318,11 @@ class RucioBroker:
        return True


<<<<<<< HEAD
class RucioObjectStore(ConcreteObjectStore):
=======
class RucioObjectStore(CachingConcreteObjectStore):
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
    """
    Object store implementation that uses ORNL remote data broker.

@@ -285,20 +330,34 @@ class RucioObjectStore(CachingConcreteObjectStore):
    Galaxy at some future point or significantly modified.
    """

<<<<<<< HEAD
    cache_monitor: Optional[InProcessCacheMonitor] = None

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
    store_type = "rucio"

    def to_dict(self):
        rval = super().to_dict()
        rval["rucio"] = self.rucio_config
        rval["cache"] = self.cache_config
<<<<<<< HEAD
        rval["oidc_providers"] = self.oidc_providers
=======
        rval["oidc_provider"] = self.oidc_provider
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        rval["enable_cache_monitor"] = self.enable_cache_monitor
        return rval

    def __init__(self, config, config_dict):
        super().__init__(config, config_dict)
        self.rucio_config = config_dict.get("rucio") or {}

<<<<<<< HEAD
        self.oidc_providers = config_dict.get("oidc_providers", None)
=======
        self.oidc_provider = config_dict.get("oidc_provider", None)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        self.rucio_broker = RucioBroker(self.rucio_config)
        cache_dict = config_dict.get("cache") or {}
        self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict)
@@ -310,8 +369,64 @@ class RucioObjectStore(CachingConcreteObjectStore):
        self._initialize()

    def _initialize(self):
<<<<<<< HEAD
        if self.enable_cache_monitor:
            self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval)

    def _in_cache(self, rel_path):
        """Check if the given dataset is in the local cache and return True if so."""
        cache_path = self._get_cache_path(rel_path)
        return os.path.exists(cache_path)

    def _construct_path(
        self,
        obj,
        base_dir=None,
        dir_only=None,
        extra_dir=None,
        extra_dir_at_root=False,
        alt_name=None,
        obj_dir=False,
        **kwargs,
    ):
        # extra_dir should never be constructed from provided data but just
        # make sure there are no shenanigans afoot
        if extra_dir and extra_dir != os.path.normpath(extra_dir):
            log.warning("extra_dir is not normalized: %s", extra_dir)
            raise ObjectInvalid("The requested object is invalid")
        # ensure that any parent directory references in alt_name would not
        # result in a path not contained in the directory path constructed here
        if alt_name:
            if not safe_relpath(alt_name):
                log.warning("alt_name would locate path outside dir: %s", alt_name)
                raise ObjectInvalid("The requested object is invalid")
            # alt_name can contain parent directory references, but S3 will not
            # follow them, so if they are valid we normalize them out
            alt_name = os.path.normpath(alt_name)
        rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj)))
        if extra_dir is not None:
            if extra_dir_at_root:
                rel_path = os.path.join(extra_dir, rel_path)
            else:
                rel_path = os.path.join(rel_path, extra_dir)

        # for JOB_WORK directory
        if obj_dir:
            rel_path = os.path.join(rel_path, str(self._get_object_id(obj)))
        if base_dir:
            base = self.extra_dirs.get(base_dir)
            return os.path.join(str(base), rel_path)

        if not dir_only:
            rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat")
        return rel_path

    def _get_cache_path(self, rel_path):
        return os.path.abspath(os.path.join(self.staging_path, rel_path))
=======
        self._ensure_staging_path_writable()
        self._start_cache_monitor_if_needed()
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62

    def _pull_into_cache(self, rel_path, auth_token):
        log.debug("rucio _pull_into_cache: %s", rel_path)
@@ -364,6 +479,28 @@ class RucioObjectStore(CachingConcreteObjectStore):
    def parse_xml(cls, config_xml):
        return parse_config_xml(config_xml)

<<<<<<< HEAD
    def file_ready(self, obj, **kwargs):
        log.debug("rucio file_ready")
        """
        A helper method that checks if a file corresponding to a dataset is
        ready and available to be used. Return ``True`` if so, ``False`` otherwise.
        """
        rel_path = self._construct_path(obj, **kwargs)
        # Make sure the size in cache is available in its entirety
        if self._in_cache(rel_path):
            if os.path.getsize(self._get_cache_path(rel_path)) == self.rucio_broker.get_size(rel_path):
                return True
        log.debug(
            "Waiting for dataset %s to transfer from OS: %s/%s",
            rel_path,
            os.path.getsize(self._get_cache_path(rel_path)),
            self.rucio_broker.get_size(rel_path),
        )
        return False

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
    def _create(self, obj, **kwargs):
        if not self._exists(obj, **kwargs):
            # Pull out locally used fields
@@ -394,6 +531,16 @@ class RucioObjectStore(CachingConcreteObjectStore):
            log.debug("rucio _create: %s", rel_path)
        return self

<<<<<<< HEAD
    def _empty(self, obj, **kwargs):
        log.debug("rucio _empty")
        if self._exists(obj, **kwargs):
            return bool(self._size(obj, **kwargs) > 0)
        else:
            raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}")

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
    def _size(self, obj, **kwargs):
        rel_path = self._construct_path(obj, **kwargs)
        log.debug("rucio _size: %s", rel_path)
@@ -406,6 +553,12 @@ class RucioObjectStore(CachingConcreteObjectStore):
            if size != 0:
                return size
        if self._exists(obj, **kwargs):
<<<<<<< HEAD
            return self.rucio_broker.get_size(rel_path)
        log.warning("Did not find dataset '%s', returning 0 for size", rel_path)
        return 0

=======
            return self._get_remote_size(rel_path)
        log.warning("Did not find dataset '%s', returning 0 for size", rel_path)
        return 0
@@ -413,22 +566,31 @@ class RucioObjectStore(CachingConcreteObjectStore):
    def _get_remote_size(self, rel_path):
        return self.rucio_broker.get_size(rel_path)

>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
    def _delete(self, obj, entire_dir=False, **kwargs):
        rel_path = self._construct_path(obj, **kwargs)
        extra_dir = kwargs.get("extra_dir", None)
        base_dir = kwargs.get("base_dir", None)
        dir_only = kwargs.get("dir_only", False)
        obj_dir = kwargs.get("obj_dir", False)
<<<<<<< HEAD
=======
        log.debug("rucio _delete: %s", rel_path)
        auth_token = self._get_token(**kwargs)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62

        try:
            # Remove temporary data in JOB_WORK directory
            if base_dir and dir_only and obj_dir:
                shutil.rmtree(os.path.abspath(rel_path))
                return True

<<<<<<< HEAD
            log.debug("rucio _delete: %s", rel_path)
            auth_token = self._get_token(**kwargs)

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
            # Delete from cache first
            if entire_dir and extra_dir:
                shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True)
@@ -443,6 +605,7 @@ class RucioObjectStore(CachingConcreteObjectStore):
            log.exception("%s delete error", self._get_filename(obj, **kwargs))
        return False

<<<<<<< HEAD
    def _get_data(self, obj, start=0, count=-1, **kwargs):
        rel_path = self._construct_path(obj, **kwargs)
        log.debug("rucio _get_data: %s", rel_path)
@@ -457,6 +620,8 @@ class RucioObjectStore(CachingConcreteObjectStore):
        data_file.close()
        return content

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
    def _get_token(self, **kwargs):
        auth_token = kwargs.get("auth_token", None)
        if auth_token:
@@ -469,11 +634,17 @@ class RucioObjectStore(CachingConcreteObjectStore):
                user = trans.user
            else:
                user = arg_user
<<<<<<< HEAD
            for oidc_provider in self.oidc_providers:
                backend = provider_name_to_backend(oidc_provider)
                tokens = user.get_oidc_tokens(backend)
                if tokens["id"]:
                    return tokens["id"]
=======
            backend = provider_name_to_backend(self.oidc_provider)
            tokens = user.get_oidc_tokens(backend)
            return tokens["id"]
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        except Exception as e:
            log.debug("Failed to get auth token: %s", e)
            return None
@@ -481,16 +652,28 @@ class RucioObjectStore(CachingConcreteObjectStore):
    def _get_filename(self, obj, **kwargs):
        base_dir = kwargs.get("base_dir", None)
        dir_only = kwargs.get("dir_only", False)
<<<<<<< HEAD
        rel_path = self._construct_path(obj, **kwargs)
        sync_cache = kwargs.get("sync_cache", True)

=======
        auth_token = self._get_token(**kwargs)
        rel_path = self._construct_path(obj, **kwargs)
        sync_cache = kwargs.get("sync_cache", True)

        log.debug("rucio _get_filename: %s", rel_path)

>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        # for JOB_WORK directory
        if base_dir and dir_only:
            return os.path.abspath(rel_path)

<<<<<<< HEAD
        auth_token = self._get_token(**kwargs)
        log.debug("rucio _get_filename: %s", rel_path)

=======
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        cache_path = self._get_cache_path(rel_path)
        if not sync_cache:
            return cache_path
@@ -524,7 +707,11 @@ class RucioObjectStore(CachingConcreteObjectStore):
            file_name = self._get_cache_path(rel_path)
            if not os.path.islink(file_name):
                raise ObjectInvalid(
<<<<<<< HEAD
                    "rucio objectstore._register_file, rucio_register_only " "is set, but file in cache is not a link "
=======
                    "rucio objectstore._register_file, rucio_register_only is set, but file in cache is not a link "
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
                )
        if os.path.islink(file_name):
            file_name = os.readlink(file_name)
@@ -579,6 +766,7 @@ class RucioObjectStore(CachingConcreteObjectStore):
        kwargs["object_id"] = obj.id
        return kwargs

<<<<<<< HEAD
    @property
    def cache_target(self) -> CacheTarget:
        return CacheTarget(
@@ -587,6 +775,9 @@ class RucioObjectStore(CachingConcreteObjectStore):
            0.9,
        )

    def shutdown(self):
        self.cache_monitor and self.cache_monitor.shutdown()
=======
    def shutdown(self):
        self._shutdown_cache_monitor()
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
+38 −0
Original line number Diff line number Diff line
import copy
import logging
import time
<<<<<<< HEAD
from abc import ABC

try:
    from rucio.client.uploadclient import UploadClient
    from rucio.common.exception import (  # type: ignore
=======

try:
    from rucio.client.uploadclient import UploadClient
    from rucio.common.exception import (
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        InputValidationError,
        NoFilesUploaded,
        NotAllFilesUploaded,
@@ -12,7 +21,11 @@ try:
    from rucio.common.utils import generate_uuid
    from rucio.rse import rsemanager as rsemgr
except ImportError:
<<<<<<< HEAD
    UploadClient = ABC
=======
    UploadClient = object
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62


class DeleteClient(UploadClient):
@@ -34,7 +47,11 @@ class DeleteClient(UploadClient):
            if not self.rses.get(rse):
                rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo))
                if not ignore_availability and rse_settings["availability_delete"] != 1:
<<<<<<< HEAD
                    logger(logging.DEBUG, "%s is not available for deletion. No actions have been taken" % rse)
=======
                    logger(logging.DEBUG, "%s is not available for deletion. No actions have been taken", rse)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
                    continue

            # protocol handling and deletion
@@ -51,8 +68,13 @@ class DeleteClient(UploadClient):
                    success = True
                except Exception as error:
                    logger(logging.WARNING, "Delete attempt failed")
<<<<<<< HEAD
                    logger(logging.INFO, "Exception: %s" % str(error), exc_info=True)
            logger(logging.DEBUG, "Successfully deleted dataset %s" % pfn)
=======
                    logger(logging.INFO, "Exception: %s", error, exc_info=True)
            logger(logging.DEBUG, "Successfully deleted dataset %s", pfn)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62


class InPlaceIngestClient(UploadClient):
@@ -106,7 +128,11 @@ class InPlaceIngestClient(UploadClient):
            if not self.rses.get(rse):
                rse_settings = self.rses.setdefault(rse, rsemgr.get_rse_info(rse, vo=self.client.vo))
                if not ignore_availability and rse_settings["availability_write"] != 1:
<<<<<<< HEAD
                    raise RSEWriteBlocked("%s is not available for writing. No actions have been taken" % rse)
=======
                    raise RSEWriteBlocked(f"{rse} is not available for writing. No actions have been taken")
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62

            dataset_scope = file.get("dataset_scope")
            dataset_name = file.get("dataset_name")
@@ -119,7 +145,11 @@ class InPlaceIngestClient(UploadClient):
            registered_file_dids.add(f"{file['did_scope']}:{file['did_name']}")
        wrong_dids = registered_file_dids.intersection(registered_dataset_dids)
        if len(wrong_dids):
<<<<<<< HEAD
            raise InputValidationError("DIDs used to address both files and datasets: %s" % str(wrong_dids))
=======
            raise InputValidationError(f"DIDs used to address both files and datasets: {wrong_dids}")
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
        logger(logging.DEBUG, "Input validation done.")

        # clear this set again to ensure that we only try to register datasets once
@@ -128,7 +158,11 @@ class InPlaceIngestClient(UploadClient):
        summary = []
        for file in files:
            basename = file["basename"]
<<<<<<< HEAD
            logger(logging.INFO, "Preparing upload for file %s" % basename)
=======
            logger(logging.INFO, "Preparing upload for file %s", basename)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62

            pfn = file.get("pfn")

@@ -168,7 +202,11 @@ class InPlaceIngestClient(UploadClient):
            trace["transferEnd"] = time.time()
            trace["clientState"] = "DONE"
            file["state"] = "A"
<<<<<<< HEAD
            logger(logging.INFO, "Successfully uploaded file %s" % basename)
=======
            logger(logging.INFO, "Successfully uploaded file %s", basename)
>>>>>>> 08d42ea24fcc299b11cb67620f737f3b944bcc62
            self._send_trace(trace)

            if summary_file_path: