import requests
from uuid import UUID
from urllib.parse import urljoin, urlparse
from authlib.integrations.base_client.errors import OAuthError
from flask import (
flash, request, url_for, redirect, Response, Blueprint,
current_app as app)
from . import client
from . import session
from .ui import render_ui
from .checks import require_oauth2
from .client import (oauth2_get,
oauth2_post,
oauth2_client,
authserver_uri,
user_logged_in)
from .request_utils import (user_details,
request_error,
process_error,
with_flash_error,
authserver_authorise_uri)
users = Blueprint("user", __name__)
@users.route("/profile", methods=["GET"])
@require_oauth2
def user_profile():
__id__ = lambda the_val: the_val
usr_dets = user_details()
def __render__(usr_dets, roles=[], **kwargs):
return render_ui(
"oauth2/view-user.html", user_details=usr_dets, roles=roles,
user_privileges = tuple(
privilege["privilege_id"] for role in roles
for privilege in role["privileges"]),
**kwargs)
def __roles_success__(roles):
if bool(usr_dets.get("group")):
return __render__(usr_dets, roles)
return oauth2_get("auth/user/group/join-request").either(
lambda err: __render__(
user_details, group_join_error=process_error(err)),
lambda gjr: __render__(usr_dets, roles=roles, group_join_request=gjr))
return oauth2_get("auth/system/roles").either(
lambda err: __render__(usr_dets, role_error=process_error(err)),
__roles_success__)
@users.route("/request-add-to-group", methods=["POST"])
@require_oauth2
def request_add_to_group() -> Response:
"""Request to be added to a group."""
form = request.form
group_id = form["group"]
def __error__(error):
err = process_error(error)
flash(f"{err['error']}: {err['error_message']}", "alert-danger")
return redirect(url_for("oauth2.user.user_profile"))
def __success__(response):
flash(f"{response['message']} (Response ID: {response['request_id']})",
"alert-success")
return redirect(url_for("oauth2.user.user_profile"))
return oauth2_post(f"auth/group/requests/join/{group_id}",
json=form).either(__error__, __success__)
@users.route("/logout", methods=["GET", "POST"])
def logout():
if user_logged_in():
resp = oauth2_client().revoke_token(
urljoin(authserver_uri(), "auth/revoke"))
the_session = session.session_info()
if not bool(the_session["masquerading"]):
# Normal session - clear and go back.
session.clear_session_info()
flash("Successfully logged out.", "alert-success")
return redirect("/")
# Restore masquerading session
session.unset_masquerading()
flash(
"Successfully logged out as user "
f"{the_session['user']['name']} ({the_session['user']['email']}) "
"and restored session for user "
f"{the_session['masquerading']['name']} "
f"({the_session['masquerading']['email']})",
"alert-success")
return redirect("/")
@users.route("/register", methods=["GET", "POST"])
def register_user():
if user_logged_in():
next_endpoint=request.args.get("next", "/")
flash(("You cannot register a new user while logged in. "
"Please logout to register a new user."),
"alert-danger")
return redirect(next_endpoint)
if request.method == "GET":
return render_ui("oauth2/register_user.html")
form = request.form
response = requests.post(
urljoin(authserver_uri(), "auth/user/register"),
json = {
"user_name": form.get("user_name"),
"email": form.get("email_address"),
"password": form.get("password"),
"confirm_password": form.get("confirm_password"),
**dict(
item.split("=") for item in
urlparse(authserver_authorise_uri()).query.split("&"))})
results = response.json()
if "error" in results:
error_messages = tuple(
f"{results['error']}: {msg.strip()}"
for msg in results.get("error_description").split("::"))
for message in error_messages:
flash(message, "alert-danger")
return redirect(url_for("oauth2.user.register_user"))
flash("Registration successful! Please login to continue.", "alert-success")
return redirect("/")
@users.route("/masquerade", methods=["GET", "POST"])
def masquerade():
"""Masquerade as a particular user."""
if request.method == "GET":
this_user = session.session_info()["user"]
return client.get("auth/user/list").either(
lambda err: render_ui(
"oauth2/masquerade.html", users_error=process_error(err)),
lambda usrs: render_ui(
"oauth2/masquerade.html", users=tuple(
usr for usr in usrs
if UUID(usr["user_id"]) != this_user["user_id"])))
def __masq_success__(masq_details):
session.set_masquerading(masq_details)
flash(
f"User {masq_details['original']['user']['name']} "
f"({masq_details['original']['user']['email']}) is now "
"successfully masquerading as the user "
f"User {masq_details['masquerade_as']['user']['name']} "
f"({masq_details['masquerade_as']['user']['email']}) is now ",
"alert-success")
return redirect("/")
form = request.form
masquerade_as = form.get("masquerade_as").strip()
if not(bool(masquerade_as)):
flash("You must provide a user to masquerade as.", "alert-danger")
return redirect(url_for("oauth2.user.masquerade"))
return client.post(
"auth/user/masquerade/",
json={"masquerade_as": request.form.get("masquerade_as")}).either(
with_flash_error(redirect(url_for("oauth2.user.masquerade"))),
__masq_success__)