aboutsummaryrefslogtreecommitdiff
import requests
from uuid import UUID
from urllib.parse import urljoin, urlparse

from authlib.integrations.base_client.errors import OAuthError
from flask import (
    flash, request, url_for, redirect, Response, Blueprint,
    current_app as app)

from . import client
from . import session
from .ui import render_ui
from .checks import require_oauth2
from .client import (oauth2_get,
                     oauth2_post,
                     oauth2_client,
                     authserver_uri,
                     user_logged_in)
from .request_utils import (user_details,
                            request_error,
                            process_error,
                            with_flash_error,
                            authserver_authorise_uri)

users = Blueprint("user", __name__)

@users.route("/profile", methods=["GET"])
@require_oauth2
def user_profile():
    __id__ = lambda the_val: the_val
    usr_dets = user_details()
    def __render__(usr_dets, roles=[], **kwargs):
        return render_ui(
            "oauth2/view-user.html", user_details=usr_dets, roles=roles,
            user_privileges = tuple(
                privilege["privilege_id"] for role in roles
                for privilege in role["privileges"]),
            **kwargs)

    def __roles_success__(roles):
        if bool(usr_dets.get("group")):
            return __render__(usr_dets, roles)
        return oauth2_get("auth/user/group/join-request").either(
            lambda err: __render__(
                user_details, group_join_error=process_error(err)),
            lambda gjr: __render__(usr_dets, roles=roles, group_join_request=gjr))

    return oauth2_get("auth/system/roles").either(
        lambda err: __render__(usr_dets, role_error=process_error(err)),
        __roles_success__)

@users.route("/request-add-to-group", methods=["POST"])
@require_oauth2
def request_add_to_group() -> Response:
    """Request to be added to a group."""
    form = request.form
    group_id = form["group"]

    def __error__(error):
        err = process_error(error)
        flash(f"{err['error']}: {err['error_message']}", "alert-danger")
        return redirect(url_for("oauth2.user.user_profile"))

    def __success__(response):
        flash(f"{response['message']} (Response ID: {response['request_id']})",
              "alert-success")
        return redirect(url_for("oauth2.user.user_profile"))

    return oauth2_post(f"auth/group/requests/join/{group_id}",
                       json=form).either(__error__, __success__)


@users.route("/logout", methods=["GET", "POST"])
def logout():
    if user_logged_in():
        resp = oauth2_client().revoke_token(
            urljoin(authserver_uri(), "auth/revoke"))
        the_session = session.session_info()
        if not bool(the_session["masquerading"]):
            # Normal session - clear and go back.
            session.clear_session_info()
            flash("Successfully logged out.", "alert-success")
            return redirect("/")
        # Restore masquerading session
        session.unset_masquerading()
        flash(
            "Successfully logged out as user "
            f"{the_session['user']['name']} ({the_session['user']['email']}) "
            "and restored session for user "
            f"{the_session['masquerading']['name']} "
            f"({the_session['masquerading']['email']})",
            "alert-success")

    return redirect("/")

@users.route("/register", methods=["GET", "POST"])
def register_user():
    if user_logged_in():
        next_endpoint=request.args.get("next", "/")
        flash(("You cannot register a new user while logged in. "
               "Please logout to register a new user."),
              "alert-danger")
        return redirect(next_endpoint)

    if request.method == "GET":
        return render_ui("oauth2/register_user.html")

    form = request.form
    response = requests.post(
        urljoin(authserver_uri(), "auth/user/register"),
        json = {
            "user_name": form.get("user_name"),
            "email": form.get("email_address"),
            "password": form.get("password"),
            "confirm_password": form.get("confirm_password"),
            **dict(
                item.split("=") for item in
                urlparse(authserver_authorise_uri()).query.split("&"))})
    results = response.json()
    if "error" in results:
        error_messages = tuple(
            f"{results['error']}: {msg.strip()}"
            for msg in results.get("error_description").split("::"))
        for message in error_messages:
            flash(message, "alert-danger")
        return redirect(url_for("oauth2.user.register_user"))

    flash("Registration successful! Please login to continue.", "alert-success")
    return redirect("/")

@users.route("/masquerade", methods=["GET", "POST"])
def masquerade():
    """Masquerade as a particular user."""
    if request.method == "GET":
        this_user = session.session_info()["user"]
        return client.get("auth/user/list").either(
            lambda err: render_ui(
                "oauth2/masquerade.html", users_error=process_error(err)),
            lambda usrs: render_ui(
                "oauth2/masquerade.html", users=tuple(
                    usr for usr in usrs
                    if UUID(usr["user_id"]) != this_user["user_id"])))

    def __masq_success__(masq_details):
        session.set_masquerading(masq_details)
        flash(
            f"User {masq_details['original']['user']['name']} "
            f"({masq_details['original']['user']['email']}) is now "
            "successfully masquerading as the user "
            f"User {masq_details['masquerade_as']['user']['name']} "
            f"({masq_details['masquerade_as']['user']['email']}) is now ",
            "alert-success")
        return redirect("/")
    form = request.form
    masquerade_as = form.get("masquerade_as").strip()
    if not(bool(masquerade_as)):
        flash("You must provide a user to masquerade as.", "alert-danger")
        return redirect(url_for("oauth2.user.masquerade"))
    return client.post(
        "auth/user/masquerade/",
        json={"masquerade_as": request.form.get("masquerade_as")}).either(
            with_flash_error(redirect(url_for("oauth2.user.masquerade"))),
            __masq_success__)