aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/users/masquerade/models.py
blob: a155899567960c155ea8d7d9b6ebd4362e746275 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Functions for handling masquerade."""
from functools import wraps
from datetime import datetime
from authlib.jose import jwt

from flask import current_app as app


from gn_auth.auth.errors import ForbiddenAccess

from gn_auth.auth.jwks import newest_jwk_with_rotation, jwks_directory
from gn_auth.auth.authentication.oauth2.grants.refresh_token_grant import (
    RefreshTokenGrant)
from gn_auth.auth.authentication.oauth2.models.jwtrefreshtoken import (
    JWTRefreshToken,
    save_refresh_token)

from ...roles.models import user_roles
from ....db import sqlite3 as db
from ....authentication.users import User
from ....authentication.oauth2.models.oauth2token import OAuth2Token

__FIVE_HOURS__ = (60 * 60 * 5)

def can_masquerade(func):
    """Security decorator."""
    @wraps(func)
    def __checker__(*args, **kwargs):
        if len(args) == 3:
            conn, token, _masq_user = args
        elif len(args) == 2:
            conn, token = args
        elif len(args) == 1:
            conn = args[0]
            token = kwargs["original_token"]
        else:
            conn = kwargs["conn"]
            token = kwargs["original_token"]

        masq_privs = []
        for roles in user_roles(conn, token.user):
            for role in roles["roles"]:
                privileges = [p for p in role.privileges
                              if p.privilege_id == "system:user:masquerade"]
                masq_privs.extend(privileges)

        if len(masq_privs) == 0:
            raise ForbiddenAccess(
                "You do not have the ability to masquerade as another user.")
        return func(*args, **kwargs)
    return __checker__

@can_masquerade
def masquerade_as(
        conn: db.DbConnection,
        original_token: OAuth2Token,
        masqueradee: User) -> OAuth2Token:
    """Get a token that enables `masquerader` to act as `masqueradee`."""
    scope = original_token.get_scope().replace(
        # Do not allow more than one level of masquerading
        "masquerade", "").strip()
    new_token = app.config["OAUTH2_SERVER"].generate_token(
        client=original_token.client,
        grant_type="urn:ietf:params:oauth:grant-type:jwt-bearer",
        user=masqueradee,
        expires_in=original_token.get_expires_in(),
        include_refresh_token=True,
        scope=scope)
    _jwt = jwt.decode(
        new_token["access_token"],
        newest_jwk_with_rotation(
            jwks_directory(app),
            int(app.config["JWKS_ROTATION_AGE_DAYS"])))
    save_refresh_token(conn, JWTRefreshToken(
        token=new_token["refresh_token"],
        client=original_token.client,
        user=masqueradee,
        issued_with=_jwt["jti"],
        issued_at=datetime.fromtimestamp(_jwt["iat"]),
        expires=datetime.fromtimestamp(
            int(_jwt["iat"]) + RefreshTokenGrant.DEFAULT_EXPIRES_IN),
        scope=scope,
        revoked=False,
        parent_of=None))
    return new_token