aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-09-04 18:41:37 -0500
committerFrederick Muriuki Muriithi2024-09-04 18:41:37 -0500
commit08bd95dd41d23078dcc298247102053981629a43 (patch)
tree2caa0120dd1a739c942bdd49043d0f24cb5779cb
parentc00f1b948d51cb6c567a6d583ac32c8b78a29692 (diff)
downloadgn-uploader-08bd95dd41d23078dcc298247102053981629a43.tar.gz
Introduce wrapper that validates tokens
Validate token and pass it on to functions that require more fine-grained authorisation checks.
-rw-r--r--uploader/authorisation.py46
1 files changed, 46 insertions, 0 deletions
diff --git a/uploader/authorisation.py b/uploader/authorisation.py
index 8ab83f8..ee8fe97 100644
--- a/uploader/authorisation.py
+++ b/uploader/authorisation.py
@@ -1,9 +1,15 @@
"""Authorisation utilities."""
+import logging
from functools import wraps
+from typing import Callable
from flask import flash, redirect
+from pymonad.either import Left, Right, Either
+from authlib.jose import KeySet, JsonWebToken
+from authlib.jose.errors import BadSignatureError
from uploader import session
+from uploader.oauth2.client import auth_server_jwks
def require_login(function):
"""Check that the user is logged in before executing `func`."""
@@ -19,3 +25,43 @@ def require_login(function):
__clear_session__,
lambda token: function(*args, **kwargs))
return __is_session_valid__
+
+
+def __validate_token__(jwks: KeySet, token: dict) -> Either:
+ """Check that a token is signed by a key from the authorisation server."""
+ for key in jwks.keys:
+ try:
+ # Fixes CVE-2016-10555. See
+ # https://docs.authlib.org/en/latest/jose/jwt.html
+ jwt = JsonWebToken(["RS256"])
+ jwt.decode(token["access_token"], key)
+ return Right(token)
+ except BadSignatureError:
+ pass
+
+ return Left({"token": token})
+
+
+def require_token(func: Callable) -> Callable:
+ """
+ Wrap functions that require the user be authorised to perform the operations
+ that the functions in question provide.
+ """
+ def __invalid_token__(_whatever):
+ logging.debug("==========> Failure log: %s", _whatever)
+ raise Exception(
+ "You attempted to access a feature of the system that requires "
+ "authorisation. Unfortunately, we could not verify you have the "
+ "appropriate authorisation to perform the action you requested. "
+ "You might need to log in, or if you already are logged in, you "
+ "need to log out, then log back in to get a newer token/session.")
+ @wraps(func)
+ def __wrapper__(*args, **kwargs):
+ return session.user_token().then(lambda tok: {
+ "jwks": auth_server_jwks(),
+ "token": tok
+ }).then(lambda vals: __validate_token__(**vals)).either(
+ __invalid_token__,
+ lambda tok: func(*args, **{**kwargs, "token": tok}))
+
+ return __wrapper__