aboutsummaryrefslogtreecommitdiff
path: root/uploader/session.py
blob: b5381879ac420d0ea63d2d2d40861a8216c8766b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Deal with user sessions"""
from uuid import UUID, uuid4
from datetime import datetime
from typing import Any, Optional, TypedDict

from authlib.jose import KeySet
from flask import request, session
from pymonad.either import Left, Right, Either


class UserDetails(TypedDict):
    """Session information relating specifically to the user."""
    user_id: UUID
    name: str
    email: str
    token: Either
    logged_in: bool


class SessionInfo(TypedDict):
    """All Session information we save."""
    session_id: UUID
    user: UserDetails
    anon_id: UUID
    user_agent: str
    ip_addr: str
    masquerade: Optional[UserDetails]
    auth_server_jwks: Optional[dict[str, Any]]


__SESSION_KEY__ = "GN::uploader::session_info" # Do not use this outside this module!!


def clear_session_info():
    """Clears the session."""
    session.pop(__SESSION_KEY__)


def save_session_info(sess_info: SessionInfo) -> SessionInfo:
    """Save `session_info`."""
    # T0d0: if it is an existing session, verify that certain important security
    #       bits have not changed before saving.
    # old_session_info = session.get(__SESSION_KEY__)
    # if bool(old_session_info):
    #     if old_session_info["user_agent"] == request.headers.get("User-Agent"):
    #         session[__SESSION_KEY__] = sess_info
    #         return sess_info
    #     # request session verification
    #     return verify_session(sess_info)
    # New session
    session[__SESSION_KEY__] = sess_info
    return sess_info


def session_info() -> SessionInfo:
    """Retrieve the session information"""
    anon_id = uuid4()
    return save_session_info(
        session.get(__SESSION_KEY__, {
            "session_id": uuid4(),
            "user": {
                "user_id": anon_id,
                "name": "Anonymous User",
                "email": "anon@ymous.user",
                "token": Left("INVALID-TOKEN"),
                "logged_in": False
            },
            "anon_id": anon_id,
            "user_agent": request.headers.get("User-Agent"),
            "ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR",
                                           request.remote_addr),
            "masquerading": None
        }))


def set_user_token(token: str) -> SessionInfo:
    """Set the user's token."""
    info = session_info()
    return save_session_info({
        **info, "user": {**info["user"], "token": Right(token)}})#type: ignore[misc]


def set_user_details(userdets: UserDetails) -> SessionInfo:
    """Set the user details information"""
    return save_session_info({**session_info(), "user": userdets})#type: ignore[misc]

def user_details() -> UserDetails:
    """Retrieve user details."""
    return session_info()["user"]

def user_token() -> Either:
    """Retrieve the user token."""
    return session_info()["user"]["token"]


def set_auth_server_jwks(keyset: KeySet) -> KeySet:
    """Update the JSON Web Keys in the session."""
    save_session_info({
        **session_info(),# type: ignore[misc]
        "auth_server_jwks": {
            "last-updated": datetime.now().timestamp(),
            "jwks": keyset.as_dict()
        }
    })
    return keyset


def toggle_token_refreshing():
    """Toggle the state of the token_refreshing variable."""
    _session = session_info()
    return save_session_info({
        **_session,
        "token_refreshing": not _session.get("token_refreshing", False)})


def is_token_refreshing():
    """Returns whether the token is being refreshed or not."""
    return session_info().get("token_refreshing", False)