aboutsummaryrefslogtreecommitdiff
path: root/uploader/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/session.py')
-rw-r--r--uploader/session.py118
1 files changed, 118 insertions, 0 deletions
diff --git a/uploader/session.py b/uploader/session.py
new file mode 100644
index 0000000..399f28c
--- /dev/null
+++ b/uploader/session.py
@@ -0,0 +1,118 @@
+"""Deal with user sessions"""
+from uuid import UUID, uuid4
+from datetime import datetime
+from typing import Any, Optional, TypedDict
+
+from authlib.jose import KeySet
+from flask import request, session
+from pymonad.either import Left, Right, Either
+
+
+class UserDetails(TypedDict):
+ """Session information relating specifically to the user."""
+ user_id: UUID
+ name: str
+ email: str
+ token: Either
+ logged_in: bool
+
+
+class SessionInfo(TypedDict):
+ """All Session information we save."""
+ session_id: UUID
+ user: UserDetails
+ anon_id: UUID
+ user_agent: str
+ ip_addr: str
+ masquerade: Optional[UserDetails]
+ auth_server_jwks: Optional[dict[str, Any]]
+
+
+__SESSION_KEY__ = "GN::uploader::session_info" # Do not use this outside this module!!
+
+
+def clear_session_info():
+ """Clears the session."""
+ session.pop(__SESSION_KEY__)
+
+
+def save_session_info(sess_info: SessionInfo) -> SessionInfo:
+ """Save `session_info`."""
+ # T0d0: if it is an existing session, verify that certain important security
+ # bits have not changed before saving.
+ # old_session_info = session.get(__SESSION_KEY__)
+ # if bool(old_session_info):
+ # if old_session_info["user_agent"] == request.headers.get("User-Agent"):
+ # session[__SESSION_KEY__] = sess_info
+ # return sess_info
+ # # request session verification
+ # return verify_session(sess_info)
+ # New session
+ session[__SESSION_KEY__] = sess_info
+ return sess_info
+
+
+def session_info() -> SessionInfo:
+ """Retrieve the session information"""
+ anon_id = uuid4()
+ return save_session_info(
+ session.get(__SESSION_KEY__, {
+ "session_id": uuid4(),
+ "user": {
+ "user_id": anon_id,
+ "name": "Anonymous User",
+ "email": "anon@ymous.user",
+ "token": Left("INVALID-TOKEN"),
+ "logged_in": False
+ },
+ "anon_id": anon_id,
+ "user_agent": request.headers.get("User-Agent"),
+ "ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR",
+ request.remote_addr),
+ "masquerading": None
+ }))
+
+
+def set_user_token(token: str) -> SessionInfo:
+ """Set the user's token."""
+ info = session_info()
+ return save_session_info({
+ **info, "user": {**info["user"], "token": Right(token)}})#type: ignore[misc]
+
+
+def set_user_details(userdets: UserDetails) -> SessionInfo:
+ """Set the user details information"""
+ return save_session_info({**session_info(), "user": userdets})#type: ignore[misc]
+
+def user_details() -> UserDetails:
+ """Retrieve user details."""
+ return session_info()["user"]
+
+def user_token() -> Either:
+ """Retrieve the user token."""
+ return session_info()["user"]["token"]
+
+
+def set_auth_server_jwks(keyset: KeySet) -> KeySet:
+ """Update the JSON Web Keys in the session."""
+ save_session_info({
+ **session_info(),
+ "auth_server_jwks": {
+ "last-updated": datetime.now().timestamp(),
+ "jwks": keyset.as_dict()
+ }
+ })
+ return keyset
+
+
+def toggle_token_refreshing():
+ """Toggle the state of the token_refreshing variable."""
+ _session = session_info()
+ return save_session_info({
+ **_session,
+ "token_refreshing": not _session.get("token_refreshing", False)})
+
+
+def is_token_refreshing():
+ """Returns whether the token is being refreshed or not."""
+ return session_info().get("token_refreshing", False)