1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
"""Authorisation utilities."""
import logging
from functools import wraps
from typing import Callable
from flask import flash, redirect
from pymonad.either import Left, Right, Either
from authlib.jose import KeySet, JsonWebToken
from authlib.jose.errors import BadSignatureError
from uploader import session
from uploader.oauth2.client import auth_server_jwks
def require_login(function):
"""Check that the user is logged in before executing `func`."""
@wraps(function)
def __is_session_valid__(*args, **kwargs):
"""Check that the user is logged in and their token is valid."""
def __clear_session__(_no_token):
session.clear_session_info()
flash("You need to be logged in.", "alert-danger")
return redirect("/")
return session.user_token().either(
__clear_session__,
lambda token: function(*args, **kwargs))
return __is_session_valid__
def __validate_token__(jwks: KeySet, token: dict) -> Either:
"""Check that a token is signed by a key from the authorisation server."""
for key in jwks.keys:
try:
# Fixes CVE-2016-10555. See
# https://docs.authlib.org/en/latest/jose/jwt.html
jwt = JsonWebToken(["RS256"])
jwt.decode(token["access_token"], key)
return Right(token)
except BadSignatureError:
pass
return Left({"token": token})
def require_token(func: Callable) -> Callable:
"""
Wrap functions that require the user be authorised to perform the operations
that the functions in question provide.
"""
def __invalid_token__(_whatever):
logging.debug("==========> Failure log: %s", _whatever)
raise Exception(
"You attempted to access a feature of the system that requires "
"authorisation. Unfortunately, we could not verify you have the "
"appropriate authorisation to perform the action you requested. "
"You might need to log in, or if you already are logged in, you "
"need to log out, then log back in to get a newer token/session.")
@wraps(func)
def __wrapper__(*args, **kwargs):
return session.user_token().then(lambda tok: {
"jwks": auth_server_jwks(),
"token": tok
}).then(lambda vals: __validate_token__(**vals)).either(
__invalid_token__,
lambda tok: func(*args, **{**kwargs, "token": tok}))
return __wrapper__
|