From 115d98a1022dc57fee5895ac335c4aca9f7acdf5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Sat, 20 Apr 2024 21:11:26 +0300 Subject: Separate authentication from token generation Authenticate with the usual authentication code flow. Do not inherit AuthenticationCodeGrant in JWTBearerGrant, instead, use the JWTBearerGrant to generate the token after the user has already been successfully authenticated. --- .../oauth2/grants/authorisation_code_grant.py | 11 ++++++++ .../oauth2/grants/jwt_bearer_grant.py | 29 ++-------------------- 2 files changed, 13 insertions(+), 27 deletions(-) (limited to 'gn_auth/auth') diff --git a/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py b/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py index 02a876b..ae604df 100644 --- a/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py +++ b/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py @@ -26,6 +26,17 @@ class AuthorisationCodeGrant(grants.AuthorizationCodeGrant): GRANT_TYPE = "authorization_code" RESPONSE_TYPES = {'code'} + def create_authorization_response(self, redirect: str, grant_user): + """Add some data to the URI""" + response = super().create_authorization_response(redirect, grant_user) + headers = dict(response[-1]) + headers = { + **headers, + "Location": f"{headers['Location']}&user_id={grant_user.user_id}" + } + return (response[0], response[1], [ + (header, value) for header, value in headers.items()]) + def save_authorization_code(self, code, request): """Persist the authorisation code to database.""" client = request.client diff --git a/gn_auth/auth/authentication/oauth2/grants/jwt_bearer_grant.py b/gn_auth/auth/authentication/oauth2/grants/jwt_bearer_grant.py index 346a1f0..cc4a12f 100644 --- a/gn_auth/auth/authentication/oauth2/grants/jwt_bearer_grant.py +++ b/gn_auth/auth/authentication/oauth2/grants/jwt_bearer_grant.py @@ -14,7 +14,6 @@ from authlib.oauth2.rfc7523.token import ( from gn_auth.auth.authentication.users import user_by_id from gn_auth.auth.db.sqlite3 import connection, with_db_connection from gn_auth.auth.authentication.oauth2.models.oauth2client import client -from gn_auth.auth.authentication.oauth2.grants.authorisation_code_grant import AuthorisationCodeGrant class JWTBearerTokenGenerator(_JWTBearerTokenGenerator): @@ -36,34 +35,10 @@ class JWTBearerTokenGenerator(_JWTBearerTokenGenerator): "sub": str(tokendata["sub"])} -class JWTBearerGrant(_JWTBearerGrant, AuthorisationCodeGrant): +class JWTBearerGrant(_JWTBearerGrant): """Implement JWT as Authorisation Grant.""" - - def create_authorization_response(self, redirect_uri: str, grant_user): - resp = super().create_authorization_response(redirect_uri, grant_user) - headers = dict(resp[2]) - location = urlparse(headers["Location"]) - query = { - key.strip(): value.strip() for key, value in - (item.split("=") for item in - (param.strip() for param in location.query.split("&")))} - parsed_redirect = urlparse(redirect_uri) - issued = datetime.now() - jwtkey = app.config["JWT_PRIVATE_KEY"] - jwttoken = jwt.encode( - {"alg": "RS256", "typ": "jwt", "kid": jwtkey.kid}, - { - "iss": str(self.client.client_id), - "sub": str(grant_user.user_id), - "aud": f"{parsed_redirect.scheme}://{parsed_redirect.netloc}", - "exp": (issued + timedelta(minutes=5)), - "nbf": int(issued.timestamp()), - "iat": int(issued.timestamp()), - "jti": str(uuid.uuid4()), - "code": query["code"]}, - jwtkey).decode("utf8") - return (302, "", [("Location", f"{location.geturl()}&jwt={jwttoken}")]) + TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_post", "client_secret_jwt"] def resolve_issuer_client(self, issuer): -- cgit v1.2.3