aboutsummaryrefslogtreecommitdiff
"""Views for OAuth2 related functionality."""
from urllib.parse import urljoin, urlparse, urlunparse

from flask import (
    flash,
    jsonify,
    url_for,
    request,
    redirect,
    Blueprint,
    current_app as app)

from uploader import session
from uploader import monadic_requests as mrequests
from uploader.monadic_requests import make_error_handler

from . import jwks
from .tokens import request_token
from .client import (
    user_logged_in,
    authserver_uri,
    oauth2_clientid,
    fetch_user_details,
    oauth2_clientsecret)

oauth2 = Blueprint("oauth2", __name__)


@oauth2.route("/code")
def authorisation_code():
    """Receive authorisation code from auth server and use it to get token."""
    def __process_error__(error_response):
        app.logger.debug("ERROR: (%s)", error_response.content)
        flash("There was an error retrieving the authorisation token.",
              "alert alert-danger")
        return redirect("/")

    def __fail_set_user_details__(_failure):
        app.logger.debug("Fetching user details fails: %s", _failure)
        flash("Could not retrieve the user details", "alert alert-danger")
        return redirect("/")

    def __success_set_user_details__(_success):
        app.logger.debug("Session info: %s", _success)
        return redirect("/")

    def __success__(token):
        session.set_user_token(token)
        return fetch_user_details().either(
                    __fail_set_user_details__,
                    __success_set_user_details__)

    code = request.args.get("code", "").strip()
    if not bool(code):
        flash("AuthorisationError: No code was provided.", "alert alert-danger")
        return redirect("/")

    baseurl = urlparse(request.base_url, scheme=request.scheme)
    return request_token(
        token_uri=urljoin(authserver_uri(), "auth/token"),
        user_id=request.args["user_id"],
        extra_params={
            "code": code,
            "redirect_uri": urljoin(
                urlunparse(baseurl),
                url_for("oauth2.authorisation_code")),
        }).either(__process_error__, __success__)

@oauth2.route("/public-jwks")
def public_jwks():
    """List the available JWKs"""
    return jsonify({
        "documentation": (
            "The keys are listed in order of creation, from the oldest (first) "
            "to the newest (last)."),
        "jwks": tuple(key.as_dict() for key
                      in jwks.list_jwks(jwks.jwks_directory(
                          app, "UPLOADER_SECRETS")))
    })


@oauth2.route("/logout", methods=["GET"])
def logout():
    """Log out of any active sessions."""
    def __unset_session__(session_info):
        _user = session_info["user"]
        _user_str = f"{_user['name']} ({_user['email']})"
        session.clear_session_info()
        flash("Successfully signed out.", "alert alert-success")
        return redirect("/")

    if user_logged_in():
        return session.user_token().then(
            lambda _tok: mrequests.post(
                urljoin(authserver_uri(), "auth/revoke"),
                json={
                    "token": _tok["refresh_token"],
                    "token_type_hint": "refresh_token",
                    "client_id": oauth2_clientid(),
                    "client_secret": oauth2_clientsecret()
                })).either(
                    make_error_handler(
                        redirect_to=redirect("/"),
                        cleanup_thunk=lambda: __unset_session__(
                            session.session_info())),
                    lambda res: __unset_session__(session.session_info()))
    flash("There is no user that is currently logged in.", "alert alert-info")
    return redirect("/")