Loading lib/galaxy/authnz/managers.py +14 −14 Original line number Diff line number Diff line import builtins import copy from datetime import datetime, timedelta import json import logging import os import random import string from sqlalchemy import select from datetime import datetime, timedelta from cloudauthz import CloudAuthz from cloudauthz.exceptions import CloudAuthzBaseException from sqlalchemy import select from galaxy import ( exceptions, Loading Loading @@ -355,7 +355,6 @@ class AuthnzManager: return qres def refresh_expiring_oidc_tokens_for_provider(self, sa_session, auth): self.app.config.server_name try: success, message, backend = self._get_authnz_backend(auth.provider) if success is False: Loading @@ -369,15 +368,15 @@ class AuthnzManager: return False def refresh_expiring_oidc_tokens(self, sa_session): if (self.app.config.server_name != self.app.config.base_server_name and self.app.config.server_name != f"{self.app.config}.1"): if (self.app.config.server_name != self.app.config.base_server_name and self.app.config.server_name != f"{self.app.config.base_server_name}.1"): return user_filter = datetime.now() - timedelta(days=30) user_filter = datetime.now() - timedelta(days=90) all_users = sa_session.scalars(select(model.User)).all() for user in all_users: if not user.galaxy_sessions or user.current_galaxy_session.update_time < user_filter: log.debug(f"skipping token refresh for user {user.username}") continue for auth in user.custos_auth or []: self.refresh_expiring_oidc_tokens_for_provider(sa_session, auth) for auth in user.social_auth or []: Loading Loading @@ -418,7 +417,8 @@ class AuthnzManager: def _validate_permissions(self, user, jwt, provider): # Get required scope if provided in config, else use the configured scope prefix required_scopes = [f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}"] required_scopes = [ f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}"] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def callback(self, provider, state_token, authz_code, trans, login_redirect_url, idphint=None): Loading Loading
lib/galaxy/authnz/managers.py +14 −14 Original line number Diff line number Diff line import builtins import copy from datetime import datetime, timedelta import json import logging import os import random import string from sqlalchemy import select from datetime import datetime, timedelta from cloudauthz import CloudAuthz from cloudauthz.exceptions import CloudAuthzBaseException from sqlalchemy import select from galaxy import ( exceptions, Loading Loading @@ -355,7 +355,6 @@ class AuthnzManager: return qres def refresh_expiring_oidc_tokens_for_provider(self, sa_session, auth): self.app.config.server_name try: success, message, backend = self._get_authnz_backend(auth.provider) if success is False: Loading @@ -369,15 +368,15 @@ class AuthnzManager: return False def refresh_expiring_oidc_tokens(self, sa_session): if (self.app.config.server_name != self.app.config.base_server_name and self.app.config.server_name != f"{self.app.config}.1"): if (self.app.config.server_name != self.app.config.base_server_name and self.app.config.server_name != f"{self.app.config.base_server_name}.1"): return user_filter = datetime.now() - timedelta(days=30) user_filter = datetime.now() - timedelta(days=90) all_users = sa_session.scalars(select(model.User)).all() for user in all_users: if not user.galaxy_sessions or user.current_galaxy_session.update_time < user_filter: log.debug(f"skipping token refresh for user {user.username}") continue for auth in user.custos_auth or []: self.refresh_expiring_oidc_tokens_for_provider(sa_session, auth) for auth in user.social_auth or []: Loading Loading @@ -418,7 +417,8 @@ class AuthnzManager: def _validate_permissions(self, user, jwt, provider): # Get required scope if provided in config, else use the configured scope prefix required_scopes = [f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}"] required_scopes = [ f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}"] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def callback(self, provider, state_token, authz_code, trans, login_redirect_url, idphint=None): Loading