1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
"""Deal with user sessions"""
from uuid import UUID, uuid4
from datetime import datetime
from typing import Any, Optional, TypedDict
from authlib.jose import KeySet
from flask import request, session
from pymonad.either import Left, Right, Either
class UserDetails(TypedDict):
"""Session information relating specifically to the user."""
user_id: UUID
name: str
email: str
token: Either
logged_in: bool
class SessionInfo(TypedDict):
"""All Session information we save."""
session_id: UUID
user: UserDetails
anon_id: UUID
user_agent: str
ip_addr: str
masquerade: Optional[UserDetails]
auth_server_jwks: Optional[dict[str, Any]]
__SESSION_KEY__ = "GN::uploader::session_info" # Do not use this outside this module!!
def clear_session_info():
"""Clears the session."""
session.pop(__SESSION_KEY__)
def save_session_info(sess_info: SessionInfo) -> SessionInfo:
"""Save `session_info`."""
# T0d0: if it is an existing session, verify that certain important security
# bits have not changed before saving.
# old_session_info = session.get(__SESSION_KEY__)
# if bool(old_session_info):
# if old_session_info["user_agent"] == request.headers.get("User-Agent"):
# session[__SESSION_KEY__] = sess_info
# return sess_info
# # request session verification
# return verify_session(sess_info)
# New session
session[__SESSION_KEY__] = sess_info
return sess_info
def session_info() -> SessionInfo:
"""Retrieve the session information"""
anon_id = uuid4()
return save_session_info(
session.get(__SESSION_KEY__, {
"session_id": uuid4(),
"user": {
"user_id": anon_id,
"name": "Anonymous User",
"email": "anon@ymous.user",
"token": Left("INVALID-TOKEN"),
"logged_in": False
},
"anon_id": anon_id,
"user_agent": request.headers.get("User-Agent"),
"ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR",
request.remote_addr),
"masquerading": None
}))
def set_user_token(token: str) -> SessionInfo:
"""Set the user's token."""
info = session_info()
return save_session_info({
**info, "user": {**info["user"], "token": Right(token)}})#type: ignore[misc]
def set_user_details(userdets: UserDetails) -> SessionInfo:
"""Set the user details information"""
return save_session_info({**session_info(), "user": userdets})#type: ignore[misc]
def user_details() -> UserDetails:
"""Retrieve user details."""
return session_info()["user"]
def user_token() -> Either:
"""Retrieve the user token."""
return session_info()["user"]["token"]
def set_auth_server_jwks(keyset: KeySet) -> KeySet:
"""Update the JSON Web Keys in the session."""
save_session_info({
**session_info(),# type: ignore[misc]
"auth_server_jwks": {
"last-updated": datetime.now().timestamp(),
"jwks": keyset.as_dict()
}
})
return keyset
def toggle_token_refreshing():
"""Toggle the state of the token_refreshing variable."""
_session = session_info()
return save_session_info({
**_session,
"token_refreshing": not _session.get("token_refreshing", False)})
def is_token_refreshing():
"""Returns whether the token is being refreshed or not."""
return session_info().get("token_refreshing", False)
|