1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
"""Deal with user sessions"""
from uuid import UUID, uuid4
from datetime import datetime
from typing import Any, Optional, TypedDict
from authlib.jose import KeySet
from flask import request, session
from pymonad.either import Left, Right, Either
class UserDetails(TypedDict):
"""Session information relating specifically to the user."""
user_id: UUID
name: str
email: str
token: Either
logged_in: bool
class SessionInfo(TypedDict):
"""All Session information we save."""
session_id: UUID
user: UserDetails
anon_id: UUID
user_agent: str
ip_addr: str
masquerade: Optional[UserDetails]
auth_server_jwks: Optional[dict[str, Any]]
__SESSION_KEY__ = "GN::uploader::session_info" # Do not use this outside this module!!
def clear_session_info():
"""Clears the session."""
session.pop(__SESSION_KEY__)
def save_session_info(sess_info: SessionInfo) -> SessionInfo:
"""Save `session_info`."""
# T0d0: if it is an existing session, verify that certain important security
# bits have not changed before saving.
# old_session_info = session.get(__SESSION_KEY__)
# if bool(old_session_info):
# if old_session_info["user_agent"] == request.headers.get("User-Agent"):
# session[__SESSION_KEY__] = sess_info
# return sess_info
# # request session verification
# return verify_session(sess_info)
# New session
session[__SESSION_KEY__] = sess_info
return sess_info
def session_info() -> SessionInfo:
"""Retrieve the session information"""
anon_id = uuid4()
return save_session_info(
session.get(__SESSION_KEY__, {
"session_id": uuid4(),
"user": {
"user_id": anon_id,
"name": "Anonymous User",
"email": "anon@ymous.user",
"token": Left("INVALID-TOKEN"),
"logged_in": False
},
"anon_id": anon_id,
"user_agent": request.headers.get("User-Agent"),
"ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR",
request.remote_addr),
"masquerading": None
}))
def set_user_token(token: str) -> SessionInfo:
"""Set the user's token."""
info = session_info()
return save_session_info({
**info, "user": {**info["user"], "token": Right(token)}})#type: ignore[misc]
def set_user_details(userdets: UserDetails) -> SessionInfo:
"""Set the user details information"""
return save_session_info({**session_info(), "user": userdets})#type: ignore[misc]
def user_details() -> UserDetails:
"""Retrieve user details."""
return session_info()["user"]
def user_token() -> Either:
"""Retrieve the user token."""
return session_info()["user"]["token"]
def set_auth_server_jwks(keyset: KeySet) -> KeySet:
"""Update the JSON Web Keys in the session."""
save_session_info({
**session_info(),
"auth_server_jwks": {
"last-updated": datetime.now().timestamp(),
"jwks": keyset.as_dict()
}
})
return keyset
|