1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
"""Authentication endpoints."""
import uuid
import datetime
from urllib.parse import urljoin, urlparse, urlunparse
from authlib.jose import jwt, KeySet
from flask import (flash,
request,
url_for,
jsonify,
redirect,
Blueprint,
render_template,
current_app as app)
from . import jwks
from . import session
from .checks import require_oauth2
from .request_utils import user_details, process_error
from .client import (
SCOPE, no_token_post, user_logged_in, authserver_uri, oauth2_clientid)
toplevel = Blueprint("toplevel", __name__)
@toplevel.route("/register-client", methods=["GET", "POST"])
@require_oauth2
def register_client():
"""Register an OAuth2 client."""
return "USER IS LOGGED IN AND SUCCESSFULLY ACCESSED THIS ENDPOINT!"
@toplevel.route("/code", methods=["GET"])
def authorisation_code():
"""Use authorisation code to get token."""
code = request.args.get("code", "")
if bool(code):
base_url = urlparse(request.base_url, scheme=request.scheme)
jwtkey = jwks.newest_jwk_with_rotation(
jwks.jwks_directory(app, "GN2_SECRETS"),
int(app.config["JWKS_ROTATION_AGE_DAYS"]))
issued = datetime.datetime.now()
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"code": code,
"scope": SCOPE,
"redirect_uri": urljoin(
urlunparse(base_url),
url_for("oauth2.toplevel.authorisation_code")),
"assertion": jwt.encode(
header={
"alg": "RS256",
"typ": "jwt",
"kid": jwtkey.as_dict()["kid"]},
payload={
"iss": str(oauth2_clientid()),
"sub": request.args["user_id"],
"aud": urljoin(authserver_uri(), "auth/token"),
"exp": (issued + datetime.timedelta(minutes=5)).timestamp(),
"nbf": int(issued.timestamp()),
"iat": int(issued.timestamp()),
"jti": str(uuid.uuid4())},
key=jwtkey).decode("utf8"),
"client_id": app.config["OAUTH2_CLIENT_ID"]
}
def __error__(error):
flash(f"{error['error']}: {error['error_description']}",
"alert-danger")
app.logger.debug("Request error (%s) %s: %s",
error["status_code"], error["error_description"],
request_data)
return redirect("/")
def __success__(token):
session.set_user_token(token)
udets = user_details()
session.set_user_details({
"user_id": uuid.UUID(udets["user_id"]),
"name": udets["name"],
"email": udets["email"],
"token": session.user_token(),
"logged_in": True
})
return redirect("/")
return no_token_post("auth/token", json=request_data).either(
lambda err: __error__(process_error(err)), __success__)
flash("AuthorisationError: No code was provided.", "alert-danger")
return redirect("/")
@toplevel.route("/public-jwks", methods=["GET"])
def public_jwks():
"""Provide endpoint that returns the public keys."""
return jsonify({
"documentation": "The keys are listed in order of creation.",
"jwks": KeySet(jwks.list_jwks(
jwks.jwks_directory(app, "GN2_SECRETS"))).as_dict().get("keys")
})
|