about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/users/masquerade/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/users/masquerade/models.py')
-rw-r--r--gn_auth/auth/authorisation/users/masquerade/models.py60
1 files changed, 39 insertions, 21 deletions
diff --git a/gn_auth/auth/authorisation/users/masquerade/models.py b/gn_auth/auth/authorisation/users/masquerade/models.py
index 57bc564..5c11f34 100644
--- a/gn_auth/auth/authorisation/users/masquerade/models.py
+++ b/gn_auth/auth/authorisation/users/masquerade/models.py
@@ -1,20 +1,26 @@
 """Functions for handling masquerade."""
-from uuid import uuid4
 from functools import wraps
 from datetime import datetime
+from authlib.jose import jwt
 
 from flask import current_app as app
 
 
 from gn_auth.auth.errors import ForbiddenAccess
 
+from gn_auth.auth.jwks import newest_jwk_with_rotation, jwks_directory
+from gn_auth.auth.authentication.oauth2.grants.refresh_token_grant import (
+    RefreshTokenGrant)
+from gn_auth.auth.authentication.oauth2.models.jwtrefreshtoken import (
+    JWTRefreshToken,
+    save_refresh_token)
+
 from ...roles.models import user_roles
 from ....db import sqlite3 as db
 from ....authentication.users import User
-from ....authentication.oauth2.models.oauth2token import (
-    OAuth2Token, save_token)
+from ....authentication.oauth2.models.oauth2token import OAuth2Token
 
-__FIVE_HOURS__ = (60 * 60 * 5)
+__FIVE_HOURS__ = 60 * 60 * 5
 
 def can_masquerade(func):
     """Security decorator."""
@@ -31,9 +37,13 @@ def can_masquerade(func):
             conn = kwargs["conn"]
             token = kwargs["original_token"]
 
-        masq_privs = [priv for role in user_roles(conn, token.user)
-                      for priv in role.privileges
-                      if priv.privilege_id == "system:user:masquerade"]
+        masq_privs = []
+        for roles in user_roles(conn, token.user):
+            for role in roles["roles"]:
+                privileges = [p for p in role.privileges
+                              if p.privilege_id == "system:user:masquerade"]
+                masq_privs.extend(privileges)
+
         if len(masq_privs) == 0:
             raise ForbiddenAccess(
                 "You do not have the ability to masquerade as another user.")
@@ -46,22 +56,30 @@ def masquerade_as(
         original_token: OAuth2Token,
         masqueradee: User) -> OAuth2Token:
     """Get a token that enables `masquerader` to act as `masqueradee`."""
-    token_details = app.config["OAUTH2_SERVER"].generate_token(
+    scope = original_token.get_scope().replace(
+        # Do not allow more than one level of masquerading
+        "masquerade", "").strip()
+    new_token = app.config["OAUTH2_SERVER"].generate_token(
         client=original_token.client,
-        grant_type="authorization_code",
+        grant_type="urn:ietf:params:oauth:grant-type:jwt-bearer",
         user=masqueradee,
-        expires_in=__FIVE_HOURS__,
-        include_refresh_token=True)
-    new_token = OAuth2Token(
-        token_id=uuid4(),
+        expires_in=original_token.get_expires_in(),
+        include_refresh_token=True,
+        scope=scope)
+    _jwt = jwt.decode(
+        new_token["access_token"],
+        newest_jwk_with_rotation(
+            jwks_directory(app),
+            int(app.config["JWKS_ROTATION_AGE_DAYS"])))
+    save_refresh_token(conn, JWTRefreshToken(
+        token=new_token["refresh_token"],
         client=original_token.client,
-        token_type=token_details["token_type"],
-        access_token=token_details["access_token"],
-        refresh_token=token_details.get("refresh_token"),
-        scope=original_token.scope,
+        user=masqueradee,
+        issued_with=_jwt["jti"],
+        issued_at=datetime.fromtimestamp(_jwt["iat"]),
+        expires=datetime.fromtimestamp(
+            int(_jwt["iat"]) + RefreshTokenGrant.DEFAULT_EXPIRES_IN),
+        scope=scope,
         revoked=False,
-        issued_at=datetime.now(),
-        expires_in=token_details["expires_in"],
-        user=masqueradee)
-    save_token(conn, new_token)
+        parent_of=None))
     return new_token