about summary refs log tree commit diff
path: root/uploader
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-09-04 18:41:37 -0500
committerFrederick Muriuki Muriithi2024-09-04 18:41:37 -0500
commit08bd95dd41d23078dcc298247102053981629a43 (patch)
tree2caa0120dd1a739c942bdd49043d0f24cb5779cb /uploader
parentc00f1b948d51cb6c567a6d583ac32c8b78a29692 (diff)
downloadgn-uploader-08bd95dd41d23078dcc298247102053981629a43.tar.gz
Introduce wrapper that validates tokens
Validate token and pass it on to functions that require more
fine-grained authorisation checks.
Diffstat (limited to 'uploader')
-rw-r--r--uploader/authorisation.py46
1 files changed, 46 insertions, 0 deletions
diff --git a/uploader/authorisation.py b/uploader/authorisation.py
index 8ab83f8..ee8fe97 100644
--- a/uploader/authorisation.py
+++ b/uploader/authorisation.py
@@ -1,9 +1,15 @@
 """Authorisation utilities."""
+import logging
 from functools import wraps
 
+from typing import Callable
 from flask import flash, redirect
+from pymonad.either import Left, Right, Either
+from authlib.jose import KeySet, JsonWebToken
+from authlib.jose.errors import BadSignatureError
 
 from uploader import session
+from uploader.oauth2.client import auth_server_jwks
 
 def require_login(function):
     """Check that the user is logged in before executing `func`."""
@@ -19,3 +25,43 @@ def require_login(function):
             __clear_session__,
             lambda token: function(*args, **kwargs))
     return __is_session_valid__
+
+
+def __validate_token__(jwks: KeySet, token: dict) -> Either:
+    """Check that a token is signed by a key from the authorisation server."""
+    for key in jwks.keys:
+        try:
+            # Fixes CVE-2016-10555. See
+            # https://docs.authlib.org/en/latest/jose/jwt.html
+            jwt = JsonWebToken(["RS256"])
+            jwt.decode(token["access_token"], key)
+            return Right(token)
+        except BadSignatureError:
+            pass
+
+    return Left({"token": token})
+
+
+def require_token(func: Callable) -> Callable:
+    """
+    Wrap functions that require the user be authorised to perform the operations
+    that the functions in question provide.
+    """
+    def __invalid_token__(_whatever):
+        logging.debug("==========> Failure log: %s", _whatever)
+        raise Exception(
+            "You attempted to access a feature of the system that requires "
+            "authorisation. Unfortunately, we could not verify you have the "
+            "appropriate authorisation to perform the action you requested. "
+            "You might need to log in, or if you already are logged in, you "
+            "need to log out, then log back in to get a newer token/session.")
+    @wraps(func)
+    def __wrapper__(*args, **kwargs):
+        return session.user_token().then(lambda tok: {
+            "jwks": auth_server_jwks(),
+            "token": tok
+        }).then(lambda vals: __validate_token__(**vals)).either(
+            __invalid_token__,
+            lambda tok: func(*args, **{**kwargs, "token": tok}))
+
+    return __wrapper__