diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users/masquerade')
-rw-r--r-- | gn_auth/auth/authorisation/users/masquerade/models.py | 58 | ||||
-rw-r--r-- | gn_auth/auth/authorisation/users/masquerade/views.py | 13 |
2 files changed, 42 insertions, 29 deletions
diff --git a/gn_auth/auth/authorisation/users/masquerade/models.py b/gn_auth/auth/authorisation/users/masquerade/models.py index 57bc564..a155899 100644 --- a/gn_auth/auth/authorisation/users/masquerade/models.py +++ b/gn_auth/auth/authorisation/users/masquerade/models.py @@ -1,18 +1,24 @@ """Functions for handling masquerade.""" -from uuid import uuid4 from functools import wraps from datetime import datetime +from authlib.jose import jwt from flask import current_app as app from gn_auth.auth.errors import ForbiddenAccess +from gn_auth.auth.jwks import newest_jwk_with_rotation, jwks_directory +from gn_auth.auth.authentication.oauth2.grants.refresh_token_grant import ( + RefreshTokenGrant) +from gn_auth.auth.authentication.oauth2.models.jwtrefreshtoken import ( + JWTRefreshToken, + save_refresh_token) + from ...roles.models import user_roles from ....db import sqlite3 as db from ....authentication.users import User -from ....authentication.oauth2.models.oauth2token import ( - OAuth2Token, save_token) +from ....authentication.oauth2.models.oauth2token import OAuth2Token __FIVE_HOURS__ = (60 * 60 * 5) @@ -31,9 +37,13 @@ def can_masquerade(func): conn = kwargs["conn"] token = kwargs["original_token"] - masq_privs = [priv for role in user_roles(conn, token.user) - for priv in role.privileges - if priv.privilege_id == "system:user:masquerade"] + masq_privs = [] + for roles in user_roles(conn, token.user): + for role in roles["roles"]: + privileges = [p for p in role.privileges + if p.privilege_id == "system:user:masquerade"] + masq_privs.extend(privileges) + if len(masq_privs) == 0: raise ForbiddenAccess( "You do not have the ability to masquerade as another user.") @@ -46,22 +56,30 @@ def masquerade_as( original_token: OAuth2Token, masqueradee: User) -> OAuth2Token: """Get a token that enables `masquerader` to act as `masqueradee`.""" - token_details = app.config["OAUTH2_SERVER"].generate_token( + scope = original_token.get_scope().replace( + # Do not allow more than one level of masquerading + "masquerade", "").strip() + new_token = app.config["OAUTH2_SERVER"].generate_token( client=original_token.client, - grant_type="authorization_code", + grant_type="urn:ietf:params:oauth:grant-type:jwt-bearer", user=masqueradee, - expires_in=__FIVE_HOURS__, - include_refresh_token=True) - new_token = OAuth2Token( - token_id=uuid4(), + expires_in=original_token.get_expires_in(), + include_refresh_token=True, + scope=scope) + _jwt = jwt.decode( + new_token["access_token"], + newest_jwk_with_rotation( + jwks_directory(app), + int(app.config["JWKS_ROTATION_AGE_DAYS"]))) + save_refresh_token(conn, JWTRefreshToken( + token=new_token["refresh_token"], client=original_token.client, - token_type=token_details["token_type"], - access_token=token_details["access_token"], - refresh_token=token_details.get("refresh_token"), - scope=original_token.scope, + user=masqueradee, + issued_with=_jwt["jti"], + issued_at=datetime.fromtimestamp(_jwt["iat"]), + expires=datetime.fromtimestamp( + int(_jwt["iat"]) + RefreshTokenGrant.DEFAULT_EXPIRES_IN), + scope=scope, revoked=False, - issued_at=datetime.now(), - expires_in=token_details["expires_in"], - user=masqueradee) - save_token(conn, new_token) + parent_of=None)) return new_token diff --git a/gn_auth/auth/authorisation/users/masquerade/views.py b/gn_auth/auth/authorisation/users/masquerade/views.py index 276859a..8b897f2 100644 --- a/gn_auth/auth/authorisation/users/masquerade/views.py +++ b/gn_auth/auth/authorisation/users/masquerade/views.py @@ -28,22 +28,17 @@ def masquerade() -> Response: masq_user = with_db_connection(partial( user_by_id, user_id=masqueradee_id)) + def __masq__(conn): new_token = masquerade_as(conn, original_token=token, masqueradee=masq_user) return new_token - def __dump_token__(tok): - return { - key: value for key, value in (tok._asdict().items()) - if key in ("access_token", "refresh_token", "expires_in", - "token_type") - } + return jsonify({ "original": { - "user": token.user._asdict(), - "token": __dump_token__(token) + "user": asdict(token.user) }, "masquerade_as": { "user": asdict(masq_user), - "token": __dump_token__(with_db_connection(__masq__)) + "token": with_db_connection(__masq__) } }) |