aboutsummaryrefslogtreecommitdiff
path: root/uploader/oauth2/views.py
blob: c33f7bc1c2963b54c599f9018db9b1d1a7a4f305 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""Views for OAuth2 related functionality."""
import uuid
from datetime import datetime, timedelta
from urllib.parse import urljoin, urlparse, urlunparse

from authlib.jose import jwt
from flask import (
    flash,
    jsonify,
    url_for,
    request,
    redirect,
    Blueprint,
    current_app as app)

from uploader import session
from uploader import monadic_requests as requests

from . import jwks
from .client import SCOPE, oauth2_get, oauth2_clientid, authserver_uri

oauth2 = Blueprint("oauth2", __name__)

@oauth2.route("/code")
def authorisation_code():
    """Receive authorisation code from auth server and use it to get token."""
    def __process_error__(resp_or_exception):
        app.logger.debug("ERROR: (%s)", resp_or_exception)
        flash("There was an error retrieving the authorisation token.",
              "alert-danger")
        return redirect("/")

    def __fail_set_user_details__(_failure):
        app.logger.debug("Fetching user details fails: %s", _failure)
        flash("Could not retrieve the user details", "alert-danger")
        return redirect("/")

    def __success_set_user_details__(_success):
        app.logger.debug("Session info: %s", _success)
        return redirect("/")

    def __success__(token):
        session.set_user_token(token)
        return oauth2_get("auth/user/").then(
            lambda usrdets: session.set_user_details({
                "user_id": uuid.UUID(usrdets["user_id"]),
                "name": usrdets["name"],
                "email": usrdets["email"],
                "token": session.user_token(),
                "logged_in": True})).either(
                    __fail_set_user_details__,
                    __success_set_user_details__)

    code = request.args.get("code", "").strip()
    if not bool(code):
        flash("AuthorisationError: No code was provided.", "alert-danger")
        return redirect("/")

    baseurl = urlparse(request.base_url, scheme=request.scheme)
    issued = datetime.now()
    jwtkey = jwks.newest_jwk_with_rotation(
        jwks.jwks_directory(app, "UPLOADER_SECRETS"),
        int(app.config["JWKS_ROTATION_AGE_DAYS"]))
    return requests.post(
        urljoin(authserver_uri(), "auth/token"),
        json={
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "code": code,
            "scope": SCOPE,
            "redirect_uri": urljoin(
                urlunparse(baseurl),
                url_for("oauth2.authorisation_code")),
            "assertion": jwt.encode(
                header={
                    "alg": "RS256",
                    "typ": "JWT",
                    "kid": jwtkey.as_dict()["kid"]
                },
                payload={
                    "iss": str(oauth2_clientid()),
                    "sub": request.args["user_id"],
                    "aud": urljoin(authserver_uri(),"auth/token"),
                    "exp": (issued + timedelta(minutes=5)).timestamp(),
                    "nbf": int(issued.timestamp()),
                    "iat": int(issued.timestamp()),
                    "jti": str(uuid.uuid4())
                },
                key=jwtkey).decode("utf8"),
            "client_id": oauth2_clientid()
        }).either(__process_error__, __success__)

@oauth2.route("/public-jwks")
def public_jwks():
    """List the available JWKs"""
    return jsonify({
        "documentation": (
            "The keys are listed in order of creation, from the oldest (first) "
            "to the newest (last)."),
        "jwks": tuple(key.as_dict() for key
                      in jwks.list_jwks(jwks.jwks_directory(
                          app, "UPLOADER_SECRETS")))
    })