Loading lib/galaxy/authnz/managers.py +6 −9 Original line number Diff line number Diff line Loading @@ -410,10 +410,9 @@ class AuthnzManager: return False, msg, None def _validate_permissions(self, user, jwt, provider): required_scopes = [f"{self.oidc_backends_config[provider].get('required_scope', None)}"] if required_scopes is None: required_scopes = [f"{self.app.config.oidc_scope_prefix}:*"] self._assert_jwt_contains_scopes(user, jwt, required_scopes, provider) # Get required scope if provided in config, else use the configured scope prefix required_scopes = [f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}"] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def callback(self, provider, state_token, authz_code, trans, login_redirect_url, idphint=None): try: Loading Loading @@ -442,15 +441,13 @@ class AuthnzManager: log.exception(msg) return False, msg, (None, None) def _assert_jwt_contains_scopes(self, user, jwt, required_scopes, provider): def _assert_jwt_contains_scopes(self, user, jwt, required_scopes): if not jwt: raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} does not have the required scopes: [{required_scopes}]" ) if provider == "azure": scopes = jwt.get("scp") or "" else: scopes = jwt.get("scope") or "" scopes = f"{jwt.get('scope')} {jwt.get('scp')}" or "" if not set(required_scopes).issubset(scopes.split(" ")): raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} has JWT with scopes: [{scopes}] but not required scopes: [{required_scopes}]" Loading Loading
lib/galaxy/authnz/managers.py +6 −9 Original line number Diff line number Diff line Loading @@ -410,10 +410,9 @@ class AuthnzManager: return False, msg, None def _validate_permissions(self, user, jwt, provider): required_scopes = [f"{self.oidc_backends_config[provider].get('required_scope', None)}"] if required_scopes is None: required_scopes = [f"{self.app.config.oidc_scope_prefix}:*"] self._assert_jwt_contains_scopes(user, jwt, required_scopes, provider) # Get required scope if provided in config, else use the configured scope prefix required_scopes = [f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}"] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def callback(self, provider, state_token, authz_code, trans, login_redirect_url, idphint=None): try: Loading Loading @@ -442,15 +441,13 @@ class AuthnzManager: log.exception(msg) return False, msg, (None, None) def _assert_jwt_contains_scopes(self, user, jwt, required_scopes, provider): def _assert_jwt_contains_scopes(self, user, jwt, required_scopes): if not jwt: raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} does not have the required scopes: [{required_scopes}]" ) if provider == "azure": scopes = jwt.get("scp") or "" else: scopes = jwt.get("scope") or "" scopes = f"{jwt.get('scope')} {jwt.get('scp')}" or "" if not set(required_scopes).issubset(scopes.split(" ")): raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} has JWT with scopes: [{scopes}] but not required scopes: [{required_scopes}]" Loading