1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
"""Views for OAuth2 related functionality."""
from urllib.parse import urljoin, urlparse, urlunparse
from flask import (
flash,
jsonify,
url_for,
request,
redirect,
Blueprint,
current_app as app)
from uploader import session
from uploader import monadic_requests as mrequests
from uploader.monadic_requests import make_error_handler
from . import jwks
from .tokens import request_token
from .client import (
user_logged_in,
authserver_uri,
oauth2_clientid,
fetch_user_details,
oauth2_clientsecret)
oauth2 = Blueprint("oauth2", __name__)
@oauth2.route("/code")
def authorisation_code():
"""Receive authorisation code from auth server and use it to get token."""
def __process_error__(error_response):
app.logger.debug("ERROR: (%s)", error_response.content)
flash("There was an error retrieving the authorisation token.",
"alert alert-danger")
return redirect("/")
def __fail_set_user_details__(_failure):
app.logger.debug("Fetching user details fails: %s", _failure)
flash("Could not retrieve the user details", "alert alert-danger")
return redirect("/")
def __success_set_user_details__(_success):
app.logger.debug("Session info: %s", _success)
return redirect("/")
def __success__(token):
session.set_user_token(token)
return fetch_user_details().either(
__fail_set_user_details__,
__success_set_user_details__)
code = request.args.get("code", "").strip()
if not bool(code):
flash("AuthorisationError: No code was provided.", "alert alert-danger")
return redirect("/")
baseurl = urlparse(request.base_url, scheme=request.scheme)
return request_token(
token_uri=urljoin(authserver_uri(), "auth/token"),
user_id=request.args["user_id"],
extra_params={
"code": code,
"redirect_uri": urljoin(
urlunparse(baseurl),
url_for("oauth2.authorisation_code")),
}).either(__process_error__, __success__)
@oauth2.route("/public-jwks")
def public_jwks():
"""List the available JWKs"""
return jsonify({
"documentation": (
"The keys are listed in order of creation, from the oldest (first) "
"to the newest (last)."),
"jwks": tuple(key.as_dict() for key
in jwks.list_jwks(jwks.jwks_directory(
app, "UPLOADER_SECRETS")))
})
@oauth2.route("/logout", methods=["GET"])
def logout():
"""Log out of any active sessions."""
def __unset_session__(session_info):
_user = session_info["user"]
_user_str = f"{_user['name']} ({_user['email']})"
session.clear_session_info()
flash("Successfully signed out.", "alert alert-success")
return redirect("/")
if user_logged_in():
return session.user_token().then(
lambda _tok: mrequests.post(
urljoin(authserver_uri(), "auth/revoke"),
json={
"token": _tok["refresh_token"],
"token_type_hint": "refresh_token",
"client_id": oauth2_clientid(),
"client_secret": oauth2_clientsecret()
})).either(
make_error_handler(
redirect_to=redirect("/"),
cleanup_thunk=lambda: __unset_session__(
session.session_info())),
lambda res: __unset_session__(session.session_info()))
flash("There is no user that is currently logged in.", "alert alert-info")
return redirect("/")
|