1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
"""Model and functions for handling the Authorisation Code"""
from uuid import UUID
from datetime import datetime
from typing import NamedTuple
from pymonad.maybe import Just, Maybe, Nothing
from gn3.auth import db
from .oauth2client import OAuth2Client
from ...users import User, user_by_id
__5_MINUTES__ = 300 # in seconds
class AuthorisationCode(NamedTuple):
"""
The AuthorisationCode model for the auth(entic|oris)ation system.
"""
# Instance variables
code_id: UUID
code: str
client: OAuth2Client
redirect_uri: str
scope: str
nonce: str
auth_time: int
code_challenge: str
code_challenge_method: str
user: User
@property
def response_type(self) -> str:
"""
For authorisation code flow, the response_type type MUST always be
'code'.
"""
return "code"
def is_expired(self):
"""Check whether the code is expired."""
return self.auth_time + __5_MINUTES__ < datetime.now().timestamp()
def get_redirect_uri(self):
"""Get the redirect URI"""
return self.redirect_uri
def get_scope(self):
"""Return the assigned scope for this AuthorisationCode."""
return self.scope
def get_nonce(self):
"""Get the one-time use token."""
return self.nonce
def authorisation_code(conn: db.DbConnection ,
code: str,
client: OAuth2Client) -> Maybe[AuthorisationCode]:
"""
Retrieve the authorisation code object that corresponds to `code` and the
given OAuth2 client.
"""
with db.cursor(conn) as cursor:
query = ("SELECT * FROM authorisation_code "
"WHERE code=:code AND client_id=:client_id")
cursor.execute(
query, {"code": code, "client_id": str(client.client_id)})
result = cursor.fetchone()
if result:
return Just(AuthorisationCode(
UUID(result["code_id"]), result["code"], client,
result["redirect_uri"], result["scope"], result["nonce"],
int(result["auth_time"]), result["code_challenge"],
result["code_challenge_method"],
user_by_id(conn, UUID(result["user_id"]))))
return Nothing
def save_authorisation_code(conn: db.DbConnection,
auth_code: AuthorisationCode) -> AuthorisationCode:
"""Persist the `auth_code` into the database."""
with db.cursor(conn) as cursor:
cursor.execute(
"INSERT INTO authorisation_code VALUES("
":code_id, :code, :client_id, :redirect_uri, :scope, :nonce, "
":auth_time, :code_challenge, :code_challenge_method, :user_id"
")",
{
**auth_code._asdict(),
"code_id": str(auth_code.code_id),
"client_id": str(auth_code.client.client_id),
"user_id": str(auth_code.user.user_id)
})
return auth_code
|