about summary refs log tree commit diff
path: root/uploader/authorisation.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/authorisation.py')
-rw-r--r--uploader/authorisation.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/uploader/authorisation.py b/uploader/authorisation.py
new file mode 100644
index 0000000..3cf3585
--- /dev/null
+++ b/uploader/authorisation.py
@@ -0,0 +1,66 @@
+"""Authorisation utilities."""
+import logging
+from functools import wraps
+
+from typing import Callable
+from flask import flash, redirect
+from pymonad.either import Left, Right, Either
+from authlib.jose import KeySet, JsonWebToken
+from authlib.jose.errors import BadSignatureError
+
+from uploader import session
+from uploader.oauth2.client import auth_server_jwks
+
+def require_login(function):
+    """Check that the user is logged in before executing `func`."""
+    @wraps(function)
+    def __is_session_valid__(*args, **kwargs):
+        """Check that the user is logged in and their token is valid."""
+        def __alert_needs_sign_in__(_no_token):
+            flash("You need to be signed in.", "alert alert-danger big-alert")
+            return redirect("/")
+
+        return session.user_token().either(
+            __alert_needs_sign_in__,
+            lambda token: function(*args, **kwargs))
+    return __is_session_valid__
+
+
+def __validate_token__(jwks: KeySet, token: dict) -> Either:
+    """Check that a token is signed by a key from the authorisation server."""
+    for key in jwks.keys:
+        try:
+            # Fixes CVE-2016-10555. See
+            # https://docs.authlib.org/en/latest/jose/jwt.html
+            jwt = JsonWebToken(["RS256"])
+            jwt.decode(token["access_token"], key)
+            return Right(token)
+        except BadSignatureError:
+            pass
+
+    return Left({"token": token})
+
+
+def require_token(func: Callable) -> Callable:
+    """
+    Wrap functions that require the user be authorised to perform the operations
+    that the functions in question provide.
+    """
+    def __invalid_token__(_whatever):
+        logging.debug("==========> Failure log: %s", _whatever)
+        raise Exception(# pylint: disable=[broad-exception-raised]
+            "You attempted to access a feature of the system that requires "
+            "authorisation. Unfortunately, we could not verify you have the "
+            "appropriate authorisation to perform the action you requested. "
+            "You might need to log in, or if you already are logged in, you "
+            "need to log out, then log back in to get a newer token/session.")
+    @wraps(func)
+    def __wrapper__(*args, **kwargs):
+        return session.user_token().then(lambda tok: {
+            "jwks": auth_server_jwks(),
+            "token": tok
+        }).then(lambda vals: __validate_token__(**vals)).either(
+            __invalid_token__,
+            lambda tok: func(*args, **{**kwargs, "token": tok}))
+
+    return __wrapper__