Commit 110d8bbd authored by Cage, Gregory's avatar Cage, Gregory
Browse files

Add jwt verification in oidc python pam module

parent 5a912936
Loading
Loading
Loading
Loading
Loading
+10 −5
Original line number Diff line number Diff line
FROM ubuntu:22.04 AS no2fa

RUN apt-get update && apt-get install -y ssh libpam-python  curl python2 sudo
RUN curl https://bootstrap.pypa.io/pip/2.7/get-pip.py --output get-pip.py && python2 get-pip.py 
RUN pip2 config set global.target /lib/python2.7 && pip2 install requests
FROM ubuntu:18.04 AS no2fa

ARG DEBIAN_FRONTEND=noninteractive
ENV TZ=Europe/Moscow
RUN apt-get update && apt-get install -y ssh libpam-python curl python sudo vim python-dev build-essential libssl-dev
RUN curl https://bootstrap.pypa.io/pip/2.7/get-pip.py --output get-pip.py && python get-pip.py 
RUN apt-get install -y libffi-dev
RUN pip config set global.target /lib/python2.7 \
	&& pip install requests \
	&& pip install jwt

RUN useradd test
RUN mkhomedir_helper test
+14 −3
Original line number Diff line number Diff line
{
[{
    "provider": "keycloak",
    "verification_type": "introspection_url",
    "client_id": "galaxy",
    "client_secret":"coR3eIu4hEaxNwveSbXjsiHdHijYtRuf",
    "introspection_url": "http://host.docker.internal:8080/realms/ndip/protocol/openid-connect/token/introspect",
    "check_2fa": false,
    "enable_log": true,
    "log_file": "/tmp/oidc.log"
}

},
{
    "provider": "azure",
    "verification_type": "jwks_url",
    "client_id": "galaxy",
    "client_secret":"coR3eIu4hEaxNwveSbXjsiHdHijYtRuf",
    "jwk_url": "https://login.microsoftonline.com/{tenant}/discovery/v2.0/keys",
    "check_2fa": false,
    "enable_log": true,
    "log_file": "/tmp/oidc.log"
}]
   
 No newline at end of file
+90 −12
Original line number Diff line number Diff line
@@ -6,11 +6,15 @@
PAM module for authenticating users via a OIDC token
'''
import json
import jwt
import os
import sys
import requests
import logging

from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_der_x509_certificate

logging.basicConfig(filename='/tmp/oidc.log', encoding='utf-8', level=logging.DEBUG)


@@ -22,6 +26,7 @@ def pam_sm_setcred(pamh, _flags, _argv):
    '''
    Default
    '''
    logit("setcred")
    return pamh.PAM_SUCCESS


@@ -29,6 +34,7 @@ def pam_sm_acct_mgmt(pamh, _flags, _argv):
    '''
    Default
    '''
    logit("acct mgmt")
    return pamh.PAM_SUCCESS


@@ -36,6 +42,7 @@ def pam_sm_open_session(pamh, _flags, _argv):
    '''
    Default
    '''
    logit("open session")
    return pamh.PAM_SUCCESS


@@ -43,6 +50,7 @@ def pam_sm_close_session(pamh, _flags, _argv):
    '''
    Default
    '''
    logit("close session")
    return pamh.PAM_SUCCESS


@@ -50,6 +58,7 @@ def pam_sm_chauthtok(pamh, _flags, _argv):
    '''
    Default
    '''
    logit("chauthtok")
    return pamh.PAM_SUCCESS


@@ -57,17 +66,9 @@ def pam_sm_authenticate(pamh, _flags, _argv):
    '''
    Authenticates a user via an OIDC token
    '''    
    # Load config file and build access token
    try:
        config_dpath = os.path.dirname(os.path.realpath(__file__))
        config_fpath = os.path.join(config_dpath, 'oidc-pam.json')
        config_fd = open(config_fpath, 'r')
        config = config_fd.read()
        config_fd.close()
        config = json.loads(config)
    except Exception as error:
        logit('Error loading configuration: %s' % error)
        return pamh.PAM_AUTH_ERR
    logit("trying")
    
    # build access token

    use_first_pass = 'use_first_pass' in _argv
    # get user&token
@@ -96,6 +97,57 @@ def pam_sm_authenticate(pamh, _flags, _argv):
    except pamh.exception as error:
        return error.pam_result

    if (os.environ['PAM_OIDC_VERFIFICATION_TYPE'] == "jwks_url"):
        return verify_token_jwt(pamh, config, user, access_token)
    else:    
        return verify_token_introspection(pamh, config, user, access_token)

def verify_token_jwt(pamh, config, user, access_token):
    config = load_config_jwt(pamh)
    try:
        # Obtain appropriate cert from JWK URI
        jwks_url = config['jwks_uri']
        key_set = requests.get(jwks_url, timeout=5)

        key_id = jwt.get_unverified_header(access_token)['kid']
        for key in key_set.json()['keys']:
            if key['kid'] == key_id:
                x5c = key['x5c'][0]
                break
        else:
            raise jwt.DecodeError(f'Cannot find kid={kid}')

        cert = load_der_x509_certificate(base64.b64decode(x5c), default_backend())

        # Decode token (exp date is checked automatically)
        decoded_token = jwt_decode(
                access_token,
                key=certificate.public_key(),
                algorithms=['RS256'],
                audience=self.setting('KEY')
            )

        # Check if correct user
        if decoded_token['preferred_username'] != user:
            logit('SSH user does not match token user: %s (ssh) !=v %s (token)' % (user,
                    decoded_token['preferred_username']))
            return pamh.PAM_AUTH_ERR

        # Check if two factor authenticated
        if config['check_2fa']:
            if 'mfa' not in decoded_token['amr']:
                logit('missing 2fa in token: %s ' % access_token)
                return pamh.PAM_AUTH_ERR
    except Exception as error:
        logit('Error verifying jwt token %s, error: %s' % (access_token, error))
        return pamh.PAM_AUTH_ERR
        
    logit('Login successful for user %s, token %s' % (user, access_token))
    return pamh.PAM_SUCCESS

def verify_token_introspection(pamh, config, user, access_token):
    logit('Attempting token verification through instrosepction URL.')
    config = load_config_introspection(pamh)
    try:
        url = config['introspection_url']
        logit(access_token)
@@ -123,3 +175,29 @@ def pam_sm_authenticate(pamh, _flags, _argv):

    logit('Login successful for user %s, token %s' % (user, access_token))
    return pamh.PAM_SUCCESS

def load_config(pamh):
    # Load config file 
    config_dpath = os.path.dirname(os.path.realpath(__file__))
    config_fpath = os.path.join(config_dpath, 'oidc-pam.json')
    config_fd = open(config_fpath, 'r')
    config = config_fd.read()
    config_fd.close()
    config = json.loads(config)
    return config

def load_config_jwt(pamh):   
    try:
        config = load_config(pamh)
        return next((config_item for config_item in config if config_item['verification_type'] == "jwks_url"))
    except Exception as error:
        logit('Error loading configuration for jwt verification: %s' % error)\
        return pamh.PAM_AUTH_ERR

def load_config_introspection(pamh):
    try:
        config = load_config(pamh)
        return next((config_item for config_item in config if config_item['verification_type'] == "introspection_url"))
    except Exception as error:
        logit('Error loading configuration for introspection verification: %s' % error)\
        return pamh.PAM_AUTH_ERR
 No newline at end of file