aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authentication/oauth2/models/authorization_code.py
blob: 6c586f3d3283c41f227b3256292367cd9bf252ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Model and functions for handling the Authorisation Code"""
from uuid import UUID
from datetime import datetime
from typing import NamedTuple

from pymonad.maybe import Just, Maybe, Nothing

from gn_auth.auth.db import sqlite3 as db

from .oauth2client import OAuth2Client

from ...users import User, user_by_id

__5_MINUTES__ = 300 # in seconds

class AuthorisationCode(NamedTuple):
    """
    The AuthorisationCode model for the auth(entic|oris)ation system.
    """
    # Instance variables
    code_id: UUID
    code: str
    client: OAuth2Client
    redirect_uri: str
    scope: str
    nonce: str
    auth_time: int
    code_challenge: str
    code_challenge_method: str
    user: User

    @property
    def response_type(self) -> str:
        """
        For authorisation code flow, the response_type type MUST always be
        'code'.
        """
        return "code"

    def is_expired(self):
        """Check whether the code is expired."""
        return self.auth_time + __5_MINUTES__ < datetime.now().timestamp()

    def get_redirect_uri(self):
        """Get the redirect URI"""
        return self.redirect_uri

    def get_scope(self):
        """Return the assigned scope for this AuthorisationCode."""
        return self.scope

    def get_nonce(self):
        """Get the one-time use token."""
        return self.nonce

def authorisation_code(conn: db.DbConnection ,
                       code: str,
                       client: OAuth2Client) -> Maybe[AuthorisationCode]:
    """
    Retrieve the authorisation code object that corresponds to `code` and the
    given OAuth2 client.
    """
    with db.cursor(conn) as cursor:
        query = ("SELECT * FROM authorisation_code "
                 "WHERE code=:code AND client_id=:client_id")
        cursor.execute(
            query, {"code": code, "client_id": str(client.client_id)})
        result = cursor.fetchone()
        if result:
            return Just(AuthorisationCode(
                UUID(result["code_id"]), result["code"], client,
                result["redirect_uri"], result["scope"], result["nonce"],
                int(result["auth_time"]), result["code_challenge"],
                result["code_challenge_method"],
                user_by_id(conn, UUID(result["user_id"]))))
        return Nothing

def save_authorisation_code(conn: db.DbConnection,
                            auth_code: AuthorisationCode) -> AuthorisationCode:
    """Persist the `auth_code` into the database."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "INSERT INTO authorisation_code VALUES("
            ":code_id, :code, :client_id, :redirect_uri, :scope, :nonce, "
            ":auth_time, :code_challenge, :code_challenge_method, :user_id"
            ")",
            {
                **auth_code._asdict(),
                "code_id": str(auth_code.code_id),
                "client_id": str(auth_code.client.client_id),
                "user_id": str(auth_code.user.user_id)
            })
        return auth_code