diff options
Diffstat (limited to 'gn2/wqflask/oauth2/users.py')
-rw-r--r-- | gn2/wqflask/oauth2/users.py | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/gn2/wqflask/oauth2/users.py b/gn2/wqflask/oauth2/users.py new file mode 100644 index 00000000..12d07d0c --- /dev/null +++ b/gn2/wqflask/oauth2/users.py @@ -0,0 +1,190 @@ +import requests +from uuid import UUID +from urllib.parse import urljoin + +from authlib.integrations.base_client.errors import OAuthError +from flask import ( + flash, request, url_for, redirect, Response, Blueprint, + current_app as app) + +from . import client +from . import session +from .ui import render_ui +from .checks import require_oauth2, user_logged_in +from .client import oauth2_get, oauth2_post, oauth2_client +from .request_utils import ( + user_details, request_error, process_error, with_flash_error) + +users = Blueprint("user", __name__) + +@users.route("/profile", methods=["GET"]) +@require_oauth2 +def user_profile(): + __id__ = lambda the_val: the_val + usr_dets = user_details() + def __render__(usr_dets, roles=[], **kwargs): + return render_ui( + "oauth2/view-user.html", user_details=usr_dets, roles=roles, + user_privileges = tuple( + privilege["privilege_id"] for role in roles + for privilege in role["privileges"]), + **kwargs) + + def __roles_success__(roles): + if bool(usr_dets.get("group")): + return __render__(usr_dets, roles) + return oauth2_get("auth/user/group/join-request").either( + lambda err: __render__( + user_details, group_join_error=process_error(err)), + lambda gjr: __render__(usr_dets, roles=roles, group_join_request=gjr)) + + return oauth2_get("auth/system/roles").either( + lambda err: __render__(usr_dets, role_error=process_error(err)), + __roles_success__) + +@users.route("/request-add-to-group", methods=["POST"]) +@require_oauth2 +def request_add_to_group() -> Response: + """Request to be added to a group.""" + form = request.form + group_id = form["group"] + + def __error__(error): + err = process_error(error) + flash(f"{err['error']}: {err['error_message']}", "alert-danger") + return redirect(url_for("oauth2.user.user_profile")) + + def __success__(response): + flash(f"{response['message']} (Response ID: {response['request_id']})", + "alert-success") + return redirect(url_for("oauth2.user.user_profile")) + + return oauth2_post(f"auth/group/requests/join/{group_id}", + data=form).either(__error__, __success__) + +@users.route("/login", methods=["GET", "POST"]) +def login(): + """Route to allow users to sign up.""" + from gn2.utility.tools import AUTH_SERVER_URL + next_endpoint=request.args.get("next", False) + + if request.method == "POST": + form = request.form + client = oauth2_client() + try: + token = client.fetch_token( + urljoin(AUTH_SERVER_URL, "auth/token"), + username=form.get("email_address"), + password=form.get("password"), + grant_type="password") + session.set_user_token(token) + udets = user_details() + session.set_user_details({ + "user_id": UUID(udets["user_id"]), + "name": udets["name"], + "email": udets["email"], + "token": session.user_token(), + "logged_in": True + }) + except OAuthError as _oaerr: + flash(_oaerr.args[0], "alert-danger") + return render_ui( + "oauth2/login.html", next_endpoint=next_endpoint, + email=form.get("email_address")) + + if user_logged_in(): + if next_endpoint: + return redirect(url_for(next_endpoint)) + return redirect("/") + + return render_ui("oauth2/login.html", next_endpoint=next_endpoint) + +@users.route("/logout", methods=["GET", "POST"]) +def logout(): + from gn2.utility.tools import AUTH_SERVER_URL + if user_logged_in(): + resp = oauth2_client().revoke_token( + urljoin(AUTH_SERVER_URL, "auth/revoke")) + the_session = session.session_info() + if not bool(the_session["masquerading"]): + # Normal session - clear and go back. + session.clear_session_info() + flash("Successfully logged out.", "alert-success") + return redirect("/") + # Restore masquerading session + session.unset_masquerading() + flash( + "Successfully logged out as user " + f"{the_session['user']['name']} ({the_session['user']['email']}) " + "and restored session for user " + f"{the_session['masquerading']['name']} " + f"({the_session['masquerading']['email']})", + "alert-success") + return redirect("/") + +@users.route("/register", methods=["GET", "POST"]) +def register_user(): + from gn2.utility.tools import AUTH_SERVER_URL + if user_logged_in(): + next_endpoint=request.args.get("next", "/") + flash(("You cannot register a new user while logged in. " + "Please logout to register a new user."), + "alert-danger") + return redirect(next_endpoint) + + if request.method == "GET": + return render_ui("oauth2/register_user.html") + + form = request.form + response = requests.post( + urljoin(AUTH_SERVER_URL, "auth/user/register"), + data = { + "user_name": form.get("user_name"), + "email": form.get("email_address"), + "password": form.get("password"), + "confirm_password": form.get("confirm_password")}) + results = response.json() + if "error" in results: + error_messages = tuple( + f"{results['error']}: {msg.strip()}" + for msg in results.get("error_description").split("::")) + for message in error_messages: + flash(message, "alert-danger") + return redirect(url_for("oauth2.user.register_user")) + + flash("Registration successful! Please login to continue.", "alert-success") + return redirect(url_for("oauth2.user.login")) + +@users.route("/masquerade", methods=["GET", "POST"]) +def masquerade(): + """Masquerade as a particular user.""" + if request.method == "GET": + this_user = session.session_info()["user"] + return client.get("auth/user/list").either( + lambda err: render_ui( + "oauth2/masquerade.html", users_error=process_error(err)), + lambda usrs: render_ui( + "oauth2/masquerade.html", users=tuple( + usr for usr in usrs + if UUID(usr["user_id"]) != this_user["user_id"]))) + + def __masq_success__(masq_details): + session.set_masquerading(masq_details) + flash( + f"User {masq_details['original']['user']['name']} " + f"({masq_details['original']['user']['email']}) is now " + "successfully masquerading as the user " + f"User {masq_details['masquerade_as']['user']['name']} " + f"({masq_details['masquerade_as']['user']['email']}) is now ", + "alert-success") + return redirect("/") + form = request.form + masquerade_as = form.get("masquerade_as").strip() + if not(bool(masquerade_as)): + flash("You must provide a user to masquerade as.", "alert-danger") + return redirect(url_for("oauth2.user.masquerade")) + return client.post( + "auth/user/masquerade/", + json={"masquerade_as": request.form.get("masquerade_as")}).either( + with_flash_error(redirect(url_for("oauth2.user.masquerade"))), + __masq_success__) |