"""Views for OAuth2 related functionality.""" import uuid from datetime import datetime, timedelta from urllib.parse import urljoin, urlparse, urlunparse from authlib.jose import jwt from flask import ( flash, jsonify, url_for, request, redirect, Blueprint, current_app as app) from uploader import session from uploader import monadic_requests as mrequests from uploader.monadic_requests import make_error_handler from . import jwks from .client import ( SCOPE, oauth2_get, user_logged_in, authserver_uri, oauth2_clientid, oauth2_clientsecret) oauth2 = Blueprint("oauth2", __name__) @oauth2.route("/code") def authorisation_code(): """Receive authorisation code from auth server and use it to get token.""" def __process_error__(resp_or_exception): app.logger.debug("ERROR: (%s)", resp_or_exception) flash("There was an error retrieving the authorisation token.", "alert-danger") return redirect("/") def __fail_set_user_details__(_failure): app.logger.debug("Fetching user details fails: %s", _failure) flash("Could not retrieve the user details", "alert-danger") return redirect("/") def __success_set_user_details__(_success): app.logger.debug("Session info: %s", _success) return redirect("/") def __success__(token): session.set_user_token(token) return oauth2_get("auth/user/").then( lambda usrdets: session.set_user_details({ "user_id": uuid.UUID(usrdets["user_id"]), "name": usrdets["name"], "email": usrdets["email"], "token": session.user_token(), "logged_in": True})).either( __fail_set_user_details__, __success_set_user_details__) code = request.args.get("code", "").strip() if not bool(code): flash("AuthorisationError: No code was provided.", "alert-danger") return redirect("/") baseurl = urlparse(request.base_url, scheme=request.scheme) issued = datetime.now() jwtkey = jwks.newest_jwk_with_rotation( jwks.jwks_directory(app, "UPLOADER_SECRETS"), int(app.config["JWKS_ROTATION_AGE_DAYS"])) return mrequests.post( urljoin(authserver_uri(), "auth/token"), json={ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "code": code, "scope": SCOPE, "redirect_uri": urljoin( urlunparse(baseurl), url_for("oauth2.authorisation_code")), "assertion": jwt.encode( header={ "alg": "RS256", "typ": "JWT", "kid": jwtkey.as_dict()["kid"] }, payload={ "iss": str(oauth2_clientid()), "sub": request.args["user_id"], "aud": urljoin(authserver_uri(),"auth/token"), "exp": (issued + timedelta(minutes=5)).timestamp(), "nbf": int(issued.timestamp()), "iat": int(issued.timestamp()), "jti": str(uuid.uuid4()) }, key=jwtkey).decode("utf8"), "client_id": oauth2_clientid() }).either(__process_error__, __success__) @oauth2.route("/public-jwks") def public_jwks(): """List the available JWKs""" return jsonify({ "documentation": ( "The keys are listed in order of creation, from the oldest (first) " "to the newest (last)."), "jwks": tuple(key.as_dict() for key in jwks.list_jwks(jwks.jwks_directory( app, "UPLOADER_SECRETS"))) }) @oauth2.route("/logout", methods=["GET"]) def logout(): """Log out of any active sessions.""" def __unset_session__(session_info): _user = session_info["user"] _user_str = f"{_user['name']} ({_user['email']})" session.clear_session_info() flash("Successfully logged out.", "alert-success") return redirect("/") if user_logged_in(): return session.user_token().then( lambda _tok: mrequests.post( urljoin(authserver_uri(), "auth/revoke"), json={ "token": _tok["refresh_token"], "token_type_hint": "refresh_token", "client_id": oauth2_clientid(), "client_secret": oauth2_clientsecret() })).either( make_error_handler( redirect_to=redirect("/"), cleanup_thunk=lambda: __unset_session__( session.session_info())), lambda res: __unset_session__(session.session_info())) flash("There is no user that is currently logged in.", "alert-info") return redirect("/")