aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authentication/oauth2/models/authorization_code.py
blob: f282814ad8a7958f07f85813224f721d8d2bfc54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Model and functions for handling the Authorisation Code"""
from uuid import UUID
from datetime import datetime
from typing import NamedTuple

from pymonad.maybe import Just, Maybe, Nothing

from gn3.auth import db

from .oauth2client import OAuth2Client

from ...users import User, user_by_id

__5_MINUTES__ = 300 # in seconds

class AuthorisationCode(NamedTuple):
    """
    The AuthorisationCode model for the auth(entic|oris)ation system.
    """
    # Instance variables
    code_id: UUID
    code: str
    client: OAuth2Client
    redirect_uri: str
    scope: str
    nonce: str
    auth_time: int
    code_challenge: str
    code_challenge_method: str
    user: User

    @property
    def response_type(self) -> str:
        """
        For authorisation code flow, the response_type type MUST always be
        'code'.
        """
        return "code"

    def is_expired(self):
        """Check whether the code is expired."""
        return self.auth_time + __5_MINUTES__ < datetime.now().timestamp()

    def get_redirect_uri(self):
        """Get the redirect URI"""
        return self.redirect_uri

    def get_scope(self):
        """Return the assigned scope for this AuthorisationCode."""
        return self.scope

    def get_nonce(self):
        """Get the one-time use token."""
        return self.nonce

def authorisation_code(conn: db.DbConnection ,
                       code: str,
                       client: OAuth2Client) -> Maybe[AuthorisationCode]:
    """
    Retrieve the authorisation code object that corresponds to `code` and the
    given OAuth2 client.
    """
    with db.cursor(conn) as cursor:
        query = ("SELECT * FROM authorisation_code "
                 "WHERE code=:code AND client_id=:client_id")
        cursor.execute(
            query, {"code": code, "client_id": str(client.client_id)})
        result = cursor.fetchone()
        if result:
            return Just(AuthorisationCode(
                UUID(result["code_id"]), result["code"], client,
                result["redirect_uri"], result["scope"], result["nonce"],
                int(result["auth_time"]), result["code_challenge"],
                result["code_challenge_method"],
                user_by_id(conn, UUID(result["user_id"]))))
        return Nothing

def save_authorisation_code(conn: db.DbConnection,
                            auth_code: AuthorisationCode) -> AuthorisationCode:
    """Persist the `auth_code` into the database."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "INSERT INTO authorisation_code VALUES("
            ":code_id, :code, :client_id, :redirect_uri, :scope, :nonce, "
            ":auth_time, :code_challenge, :code_challenge_method, :user_id"
            ")",
            {
                **auth_code._asdict(),
                "code_id": str(auth_code.code_id),
                "client_id": str(auth_code.client.client_id),
                "user_id": str(auth_code.user.user_id)
            })
        return auth_code