Loading lib/galaxy/authnz/managers.py +15 −5 Original line number Diff line number Diff line Loading @@ -180,7 +180,10 @@ class AuthnzManager: # this is a EGI Check-in specific config if config_xml.find("checkin_env") is not None: rtv["checkin_env"] = config_xml.find("checkin_env").text if config_xml.find("well_known_oidc_config_uri") is not None: rtv["well_known_oidc_config_uri"] = config_xml.find("well_known_oidc_config_uri").text if config_xml.find("required_scope") is not None: rtv["required_scope"] = config_xml.find("required_scope").text return rtv def _parse_custos_config(self, config_xml): Loading Loading @@ -211,6 +214,10 @@ class AuthnzManager: rtv["pkce_support"] = asbool(config_xml.find("pkce_support").text) if config_xml.find("accepted_audiences") is not None: rtv["accepted_audiences"] = config_xml.find("accepted_audiences").text if config_xml.find("user_extra_authorization_script") is not None: rtv["user_extra_authorization_script"] = config_xml.find("user_extra_authorization_script").text if config_xml.find("required_scope") is not None: rtv["required_scope"] = config_xml.find("required_scope").text return rtv def get_allowed_idps(self): Loading Loading @@ -378,14 +385,17 @@ class AuthnzManager: raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} does not have the required scopes: [{required_scopes}]" ) scopes = jwt.get("scope") or "" scopes = f"{jwt.get('scope')} {jwt.get('scp')}" or "" if not set(required_scopes).issubset(scopes.split(" ")): raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} has JWT with scopes: [{scopes}] but not required scopes: [{required_scopes}]" ) def _validate_permissions(self, user, jwt): required_scopes = [f"{self.app.config.oidc_scope_prefix}:*"] def _validate_permissions(self, user, jwt, provider): # Get required scope if provided in config, else use the configured scope prefix required_scopes = [ f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}" ] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def _match_access_token_to_user_in_provider(self, sa_session, provider, access_token): Loading @@ -402,7 +412,7 @@ class AuthnzManager: log.exception("Could not decode access token") raise exceptions.AuthenticationFailed(err_msg="Invalid access token or an unexpected error occurred.") if user and jwt: self._validate_permissions(user, jwt) self._validate_permissions(user, jwt, provider) return user elif not user and jwt: # jwt was decoded, but no user could be matched Loading Loading
lib/galaxy/authnz/managers.py +15 −5 Original line number Diff line number Diff line Loading @@ -180,7 +180,10 @@ class AuthnzManager: # this is a EGI Check-in specific config if config_xml.find("checkin_env") is not None: rtv["checkin_env"] = config_xml.find("checkin_env").text if config_xml.find("well_known_oidc_config_uri") is not None: rtv["well_known_oidc_config_uri"] = config_xml.find("well_known_oidc_config_uri").text if config_xml.find("required_scope") is not None: rtv["required_scope"] = config_xml.find("required_scope").text return rtv def _parse_custos_config(self, config_xml): Loading Loading @@ -211,6 +214,10 @@ class AuthnzManager: rtv["pkce_support"] = asbool(config_xml.find("pkce_support").text) if config_xml.find("accepted_audiences") is not None: rtv["accepted_audiences"] = config_xml.find("accepted_audiences").text if config_xml.find("user_extra_authorization_script") is not None: rtv["user_extra_authorization_script"] = config_xml.find("user_extra_authorization_script").text if config_xml.find("required_scope") is not None: rtv["required_scope"] = config_xml.find("required_scope").text return rtv def get_allowed_idps(self): Loading Loading @@ -378,14 +385,17 @@ class AuthnzManager: raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} does not have the required scopes: [{required_scopes}]" ) scopes = jwt.get("scope") or "" scopes = f"{jwt.get('scope')} {jwt.get('scp')}" or "" if not set(required_scopes).issubset(scopes.split(" ")): raise exceptions.AuthenticationFailed( err_msg=f"User: {user.username} has JWT with scopes: [{scopes}] but not required scopes: [{required_scopes}]" ) def _validate_permissions(self, user, jwt): required_scopes = [f"{self.app.config.oidc_scope_prefix}:*"] def _validate_permissions(self, user, jwt, provider): # Get required scope if provided in config, else use the configured scope prefix required_scopes = [ f"{self.oidc_backends_config[provider].get('required_scope', f'{self.app.config.oidc_scope_prefix}:*')}" ] self._assert_jwt_contains_scopes(user, jwt, required_scopes) def _match_access_token_to_user_in_provider(self, sa_session, provider, access_token): Loading @@ -402,7 +412,7 @@ class AuthnzManager: log.exception("Could not decode access token") raise exceptions.AuthenticationFailed(err_msg="Invalid access token or an unexpected error occurred.") if user and jwt: self._validate_permissions(user, jwt) self._validate_permissions(user, jwt, provider) return user elif not user and jwt: # jwt was decoded, but no user could be matched Loading