diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users/masquerade')
| -rw-r--r-- | gn_auth/auth/authorisation/users/masquerade/models.py | 45 | ||||
| -rw-r--r-- | gn_auth/auth/authorisation/users/masquerade/views.py | 27 |
2 files changed, 37 insertions, 35 deletions
diff --git a/gn_auth/auth/authorisation/users/masquerade/models.py b/gn_auth/auth/authorisation/users/masquerade/models.py index 8ac1a68..5c11f34 100644 --- a/gn_auth/auth/authorisation/users/masquerade/models.py +++ b/gn_auth/auth/authorisation/users/masquerade/models.py @@ -1,5 +1,4 @@ """Functions for handling masquerade.""" -import uuid from functools import wraps from datetime import datetime from authlib.jose import jwt @@ -10,14 +9,18 @@ from flask import current_app as app from gn_auth.auth.errors import ForbiddenAccess from gn_auth.auth.jwks import newest_jwk_with_rotation, jwks_directory +from gn_auth.auth.authentication.oauth2.grants.refresh_token_grant import ( + RefreshTokenGrant) +from gn_auth.auth.authentication.oauth2.models.jwtrefreshtoken import ( + JWTRefreshToken, + save_refresh_token) from ...roles.models import user_roles from ....db import sqlite3 as db from ....authentication.users import User -from ....authentication.oauth2.models.oauth2token import ( - OAuth2Token, save_token) +from ....authentication.oauth2.models.oauth2token import OAuth2Token -__FIVE_HOURS__ = (60 * 60 * 5) +__FIVE_HOURS__ = 60 * 60 * 5 def can_masquerade(func): """Security decorator.""" @@ -53,28 +56,30 @@ def masquerade_as( original_token: OAuth2Token, masqueradee: User) -> OAuth2Token: """Get a token that enables `masquerader` to act as `masqueradee`.""" - token_details = app.config["OAUTH2_SERVER"].generate_token( + scope = original_token.get_scope().replace( + # Do not allow more than one level of masquerading + "masquerade", "").strip() + new_token = app.config["OAUTH2_SERVER"].generate_token( client=original_token.client, - grant_type="authorization_code", + grant_type="urn:ietf:params:oauth:grant-type:jwt-bearer", user=masqueradee, - expires_in=__FIVE_HOURS__, - include_refresh_token=True) - + expires_in=original_token.get_expires_in(), + include_refresh_token=True, + scope=scope) _jwt = jwt.decode( - original_token.access_token, + new_token["access_token"], newest_jwk_with_rotation( jwks_directory(app), int(app.config["JWKS_ROTATION_AGE_DAYS"]))) - new_token = OAuth2Token( - token_id=uuid.UUID(_jwt["jti"]), + save_refresh_token(conn, JWTRefreshToken( + token=new_token["refresh_token"], client=original_token.client, - token_type=token_details["token_type"], - access_token=token_details["access_token"], - refresh_token=token_details.get("refresh_token"), - scope=original_token.scope, + user=masqueradee, + issued_with=_jwt["jti"], + issued_at=datetime.fromtimestamp(_jwt["iat"]), + expires=datetime.fromtimestamp( + int(_jwt["iat"]) + RefreshTokenGrant.DEFAULT_EXPIRES_IN), + scope=scope, revoked=False, - issued_at=datetime.now(), - expires_in=token_details["expires_in"], - user=masqueradee) - save_token(conn, new_token) + parent_of=None)) return new_token diff --git a/gn_auth/auth/authorisation/users/masquerade/views.py b/gn_auth/auth/authorisation/users/masquerade/views.py index 68f19ee..12a8c97 100644 --- a/gn_auth/auth/authorisation/users/masquerade/views.py +++ b/gn_auth/auth/authorisation/users/masquerade/views.py @@ -1,14 +1,14 @@ """Endpoints for user masquerade""" from dataclasses import asdict from uuid import UUID -from functools import partial -from flask import request, jsonify, Response, Blueprint +from flask import request, jsonify, Response, Blueprint, current_app from gn_auth.auth.errors import InvalidData +from gn_auth.auth.authorisation.resources.groups.models import user_group +from ....db import sqlite3 as db from ...checks import require_json -from ....db.sqlite3 import with_db_connection from ....authentication.users import user_by_id from ....authentication.oauth2.resource_server import require_oauth @@ -21,29 +21,26 @@ masq = Blueprint("masquerade", __name__) @require_json def masquerade() -> Response: """Masquerade as a particular user.""" - with require_oauth.acquire("profile user masquerade") as token: + with (require_oauth.acquire("profile user masquerade") as token, + db.connection(current_app.config["AUTH_DB"]) as conn): masqueradee_id = UUID(request.json["masquerade_as"])#type: ignore[index] if masqueradee_id == token.user.user_id: raise InvalidData("You are not allowed to masquerade as yourself.") - masq_user = with_db_connection(partial( - user_by_id, user_id=masqueradee_id)) + masq_user = user_by_id(conn, user_id=masqueradee_id) + def __masq__(conn): new_token = masquerade_as(conn, original_token=token, masqueradee=masq_user) return new_token - def __dump_token__(tok): - return { - key: value for key, value in asdict(tok).items() - if key in ("access_token", "refresh_token", "expires_in", - "token_type") - } + return jsonify({ "original": { - "user": asdict(token.user), - "token": __dump_token__(token) + "user": asdict(token.user) }, "masquerade_as": { "user": asdict(masq_user), - "token": __dump_token__(with_db_connection(__masq__)) + "token": __masq__(conn), + **(user_group(conn, masq_user).maybe(# type: ignore[misc] + {}, lambda grp: {"group": grp})) } }) |
