diff options
-rw-r--r-- | uploader/authorisation.py | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/uploader/authorisation.py b/uploader/authorisation.py index 8ab83f8..ee8fe97 100644 --- a/uploader/authorisation.py +++ b/uploader/authorisation.py @@ -1,9 +1,15 @@ """Authorisation utilities.""" +import logging from functools import wraps +from typing import Callable from flask import flash, redirect +from pymonad.either import Left, Right, Either +from authlib.jose import KeySet, JsonWebToken +from authlib.jose.errors import BadSignatureError from uploader import session +from uploader.oauth2.client import auth_server_jwks def require_login(function): """Check that the user is logged in before executing `func`.""" @@ -19,3 +25,43 @@ def require_login(function): __clear_session__, lambda token: function(*args, **kwargs)) return __is_session_valid__ + + +def __validate_token__(jwks: KeySet, token: dict) -> Either: + """Check that a token is signed by a key from the authorisation server.""" + for key in jwks.keys: + try: + # Fixes CVE-2016-10555. See + # https://docs.authlib.org/en/latest/jose/jwt.html + jwt = JsonWebToken(["RS256"]) + jwt.decode(token["access_token"], key) + return Right(token) + except BadSignatureError: + pass + + return Left({"token": token}) + + +def require_token(func: Callable) -> Callable: + """ + Wrap functions that require the user be authorised to perform the operations + that the functions in question provide. + """ + def __invalid_token__(_whatever): + logging.debug("==========> Failure log: %s", _whatever) + raise Exception( + "You attempted to access a feature of the system that requires " + "authorisation. Unfortunately, we could not verify you have the " + "appropriate authorisation to perform the action you requested. " + "You might need to log in, or if you already are logged in, you " + "need to log out, then log back in to get a newer token/session.") + @wraps(func) + def __wrapper__(*args, **kwargs): + return session.user_token().then(lambda tok: { + "jwks": auth_server_jwks(), + "token": tok + }).then(lambda vals: __validate_token__(**vals)).either( + __invalid_token__, + lambda tok: func(*args, **{**kwargs, "token": tok})) + + return __wrapper__ |