aboutsummaryrefslogtreecommitdiff
path: root/gn2/wqflask/oauth2/session.py
blob: b91534b057cd98bbc87e33a408764a27e1b13f05 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Deal with user sessions"""
from uuid import UUID, uuid4
from datetime import datetime
from typing import Any, Optional, TypedDict

from flask import request, session
from pymonad.either import Left, Right, Either

class UserDetails(TypedDict):
    """Session information relating specifically to the user."""
    user_id: UUID
    name: str
    email: str
    token: Either
    logged_in: bool

class SessionInfo(TypedDict):
    """All Session information we save."""
    session_id: UUID
    user: UserDetails
    anon_id: UUID
    user_agent: str
    ip_addr: str
    masquerade: Optional[UserDetails]
    refreshing_token: bool
    auth_server_jwks: Optional[dict[str, Any]]

__SESSION_KEY__ = "GN::2::session_info" # Do not use this outside this module!!

def clear_session_info():
    """Clears the session."""
    session.pop(__SESSION_KEY__)

def save_session_info(sess_info: SessionInfo) -> SessionInfo:
    """Save `session_info`."""
    # TODO: if it is an existing session, verify that certain important security
    #       bits have not changed before saving.
    # old_session_info = session.get(__SESSION_KEY__)
    # if bool(old_session_info):
    #     if old_session_info["user_agent"] == request.headers.get("User-Agent"):
    #         session[__SESSION_KEY__] = sess_info
    #         return sess_info
    #     # request session verification
    #     return verify_session(sess_info)
    # New session
    session[__SESSION_KEY__] = sess_info
    return sess_info

def session_info() -> SessionInfo:
    """Retrieve the session information"""
    anon_id = uuid4()
    return save_session_info(
        session.get(__SESSION_KEY__, {
            "session_id": uuid4(),
            "user": {
                "user_id": anon_id,
                "name": "Anonymous User",
                "email": "anon@ymous.user",
                "token": Left("INVALID-TOKEN"),
                "logged_in": False
            },
            "anon_id": anon_id,
            "user_agent": request.headers.get("User-Agent"),
            "ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR",
                                           request.remote_addr),
            "masquerading": None,
            "token_refreshing": False
        }))


def set_user_token(token: str) -> SessionInfo:
    """Set the user's token."""
    info = session_info()
    return save_session_info({
        **info, "user": {**info["user"], "token": Right(token)}})

def set_user_details(userdets: UserDetails) -> SessionInfo:
    """Set the user details information"""
    return save_session_info({**session_info(), "user": userdets})

def user_token() -> Either:
    """Retrieve the user token."""
    return session_info()["user"]["token"]

def set_masquerading(masq_info):
    """Save the masquerading user information."""
    orig_user = session_info()["user"]
    return save_session_info({
        **session_info(),
        "user": {
            "user_id": UUID(masq_info["masquerade_as"]["user"]["user_id"]),
            "name": masq_info["masquerade_as"]["user"]["name"],
            "email": masq_info["masquerade_as"]["user"]["email"],
            "token": Right(masq_info["masquerade_as"]["token"]),
            "logged_in": True
        },
        "masquerading": orig_user
    })

def unset_masquerading():
    """Restore the original session."""
    the_session = session_info()
    return save_session_info({
        **the_session,
        "user": the_session["masquerading"],
        "masquerading": None
    })


def toggle_token_refreshing():
    """Toggle the state of the token_refreshing variable."""
    _session = session_info()
    return save_session_info({
        **_session,
        "token_refreshing": not _session.get("token_refreshing", False)})


def is_token_refreshing():
    """Returns whether the token is being refreshed or not."""
    return session_info().get("token_refreshing", False)