"""Deal with user sessions""" from uuid import UUID, uuid4 from datetime import datetime from typing import Any, Optional, TypedDict from authlib.jose import KeySet from flask import request, session from pymonad.either import Left, Right, Either class UserDetails(TypedDict): """Session information relating specifically to the user.""" user_id: UUID name: str email: str token: Either logged_in: bool class SessionInfo(TypedDict): """All Session information we save.""" session_id: UUID user: UserDetails anon_id: UUID user_agent: str ip_addr: str masquerade: Optional[UserDetails] auth_server_jwks: Optional[dict[str, Any]] __SESSION_KEY__ = "GN::uploader::session_info" # Do not use this outside this module!! def clear_session_info(): """Clears the session.""" session.pop(__SESSION_KEY__) def save_session_info(sess_info: SessionInfo) -> SessionInfo: """Save `session_info`.""" # T0d0: if it is an existing session, verify that certain important security # bits have not changed before saving. # old_session_info = session.get(__SESSION_KEY__) # if bool(old_session_info): # if old_session_info["user_agent"] == request.headers.get("User-Agent"): # session[__SESSION_KEY__] = sess_info # return sess_info # # request session verification # return verify_session(sess_info) # New session session[__SESSION_KEY__] = sess_info return sess_info def session_info() -> SessionInfo: """Retrieve the session information""" anon_id = uuid4() return save_session_info( session.get(__SESSION_KEY__, { "session_id": uuid4(), "user": { "user_id": anon_id, "name": "Anonymous User", "email": "anon@ymous.user", "token": Left("INVALID-TOKEN"), "logged_in": False }, "anon_id": anon_id, "user_agent": request.headers.get("User-Agent"), "ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr), "masquerading": None })) def set_user_token(token: str) -> SessionInfo: """Set the user's token.""" info = session_info() return save_session_info({ **info, "user": {**info["user"], "token": Right(token)}})#type: ignore[misc] def set_user_details(userdets: UserDetails) -> SessionInfo: """Set the user details information""" return save_session_info({**session_info(), "user": userdets})#type: ignore[misc] def user_details() -> UserDetails: """Retrieve user details.""" return session_info()["user"] def user_token() -> Either: """Retrieve the user token.""" return session_info()["user"]["token"] def set_auth_server_jwks(keyset: KeySet) -> KeySet: """Update the JSON Web Keys in the session.""" save_session_info({ **session_info(), "auth_server_jwks": { "last-updated": datetime.now().timestamp(), "jwks": keyset.as_dict() } }) return keyset def toggle_token_refreshing(): """Toggle the state of the token_refreshing variable.""" _session = session_info() return save_session_info({ **_session, "token_refreshing": not _session.get("token_refreshing", False)}) def is_token_refreshing(): """Returns whether the token is being refreshed or not.""" return session_info().get("token_refreshing", False)