From 8b7c598407a5fea9a3d78473e72df87606998cd4 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 4 Aug 2023 10:10:28 +0300 Subject: Copy over files from GN3 repository. --- .../auth/authentication/oauth2/grants/__init__.py | 0 .../oauth2/grants/authorisation_code_grant.py | 85 ++++++++++++++++++++++ .../authentication/oauth2/grants/password_grant.py | 22 ++++++ 3 files changed, 107 insertions(+) create mode 100644 gn_auth/auth/authentication/oauth2/grants/__init__.py create mode 100644 gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py create mode 100644 gn_auth/auth/authentication/oauth2/grants/password_grant.py (limited to 'gn_auth/auth/authentication/oauth2/grants') diff --git a/gn_auth/auth/authentication/oauth2/grants/__init__.py b/gn_auth/auth/authentication/oauth2/grants/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py b/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py new file mode 100644 index 0000000..f80d02e --- /dev/null +++ b/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py @@ -0,0 +1,85 @@ +"""Classes and function for Authorisation Code flow.""" +import uuid +import string +import random +from typing import Optional +from datetime import datetime + +from flask import current_app as app +from authlib.oauth2.rfc6749 import grants +from authlib.oauth2.rfc7636 import create_s256_code_challenge + +from gn3.auth import db +from gn3.auth.db_utils import with_db_connection +from gn3.auth.authentication.users import User + +from ..models.oauth2client import OAuth2Client +from ..models.authorization_code import ( + AuthorisationCode, authorisation_code, save_authorisation_code) + +class AuthorisationCodeGrant(grants.AuthorizationCodeGrant): + """Implement the 'Authorisation Code' grant.""" + TOKEN_ENDPOINT_AUTH_METHODS: list[str] = [ + "client_secret_basic", "client_secret_post"] + AUTHORIZATION_CODE_LENGTH: int = 48 + TOKEN_ENDPOINT_HTTP_METHODS = ['POST'] + GRANT_TYPE = "authorization_code" + RESPONSE_TYPES = {'code'} + + def save_authorization_code(self, code, request): + """Persist the authorisation code to database.""" + client = request.client + nonce = "".join(random.sample(string.ascii_letters + string.digits, + k=self.AUTHORIZATION_CODE_LENGTH)) + return __save_authorization_code__(AuthorisationCode( + uuid.uuid4(), code, client, request.redirect_uri, request.scope, + nonce, int(datetime.now().timestamp()), + create_s256_code_challenge(app.config["SECRET_KEY"]), + "S256", request.user)) + + def query_authorization_code(self, code, client): + """Retrieve the code from the database.""" + return __query_authorization_code__(code, client) + + def delete_authorization_code(self, authorization_code):# pylint: disable=[no-self-use] + """Delete the authorisation code.""" + with db.connection(app.config["AUTH_DB"]) as conn: + with db.cursor(conn) as cursor: + cursor.execute( + "DELETE FROM authorisation_code WHERE code_id=?", + (str(authorization_code.code_id),)) + + def authenticate_user(self, authorization_code) -> Optional[User]: + """Authenticate the user who own the authorisation code.""" + query = ( + "SELECT users.* FROM authorisation_code LEFT JOIN users " + "ON authorisation_code.user_id=users.user_id " + "WHERE authorisation_code.code=?") + with db.connection(app.config["AUTH_DB"]) as conn: + with db.cursor(conn) as cursor: + cursor.execute(query, (str(authorization_code.code),)) + res = cursor.fetchone() + if res: + return User( + uuid.UUID(res["user_id"]), res["email"], res["name"]) + + return None + +def __query_authorization_code__( + code: str, client: OAuth2Client) -> AuthorisationCode: + """A helper function that creates a new database connection. + + This is found to be necessary since the `AuthorizationCodeGrant` class(es) + do not have a way to pass the database connection.""" + def __auth_code__(conn) -> str: + the_code = authorisation_code(conn, code, client) + return the_code.maybe(None, lambda cde: cde) # type: ignore[misc, arg-type, return-value] + + return with_db_connection(__auth_code__) + +def __save_authorization_code__(code: AuthorisationCode) -> AuthorisationCode: + """A helper function that creates a new database connection. + + This is found to be necessary since the `AuthorizationCodeGrant` class(es) + do not have a way to pass the database connection.""" + return with_db_connection(lambda conn: save_authorisation_code(conn, code)) diff --git a/gn_auth/auth/authentication/oauth2/grants/password_grant.py b/gn_auth/auth/authentication/oauth2/grants/password_grant.py new file mode 100644 index 0000000..3233877 --- /dev/null +++ b/gn_auth/auth/authentication/oauth2/grants/password_grant.py @@ -0,0 +1,22 @@ +"""Allows users to authenticate directly.""" + +from flask import current_app as app +from authlib.oauth2.rfc6749 import grants + +from gn3.auth import db +from gn3.auth.authentication.users import valid_login, user_by_email + +from gn3.auth.authorisation.errors import NotFoundError + +class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant): + """Implement the 'Password' grant.""" + TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post"] + + def authenticate_user(self, username, password): + "Authenticate the user with their username and password." + with db.connection(app.config["AUTH_DB"]) as conn: + try: + user = user_by_email(conn, username) + return user if valid_login(conn, user, password) else None + except NotFoundError as _nfe: + return None -- cgit v1.2.3