From 3f5e71500e845130f992e24d288d827c76d795b3 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 23 May 2023 11:44:18 +0300 Subject: oauth2: Enable admins to masquerade as other users --- wqflask/wqflask/oauth2/client.py | 3 +- wqflask/wqflask/oauth2/session.py | 34 +++++++++++-- wqflask/wqflask/oauth2/toplevel.py | 1 + wqflask/wqflask/oauth2/users.py | 59 +++++++++++++++++++++-- wqflask/wqflask/templates/oauth2/masquerade.html | 39 +++++++++++++++ wqflask/wqflask/templates/oauth2/profile_nav.html | 12 +++++ 6 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 wqflask/wqflask/templates/oauth2/masquerade.html diff --git a/wqflask/wqflask/oauth2/client.py b/wqflask/wqflask/oauth2/client.py index 44463456..b992f0ad 100644 --- a/wqflask/wqflask/oauth2/client.py +++ b/wqflask/wqflask/oauth2/client.py @@ -12,7 +12,8 @@ from authlib.integrations.requests_client import OAuth2Session from wqflask.oauth2 import session from wqflask.oauth2.checks import user_logged_in -SCOPE = "profile group role resource register-client user introspect migrate-data" +SCOPE = ("profile group role resource register-client user masquerade " + "introspect migrate-data") def oauth2_client(): config = app.config diff --git a/wqflask/wqflask/oauth2/session.py b/wqflask/wqflask/oauth2/session.py index 011d95f3..ec878f50 100644 --- a/wqflask/wqflask/oauth2/session.py +++ b/wqflask/wqflask/oauth2/session.py @@ -1,6 +1,6 @@ """Deal with user sessions""" from uuid import UUID, uuid4 -from typing import Any, TypedDict +from typing import Any, Optional, TypedDict from flask import request, session from pymonad.either import Left, Right, Either @@ -9,6 +9,7 @@ class UserDetails(TypedDict): """Session information relating specifically to the user.""" user_id: UUID name: str + email: str token: Either logged_in: bool @@ -19,6 +20,7 @@ class SessionInfo(TypedDict): anon_id: UUID user_agent: str ip_addr: str + masquerade: Optional[UserDetails] __SESSION_KEY__ = "session_info" # Do not use this outside this module!! @@ -49,14 +51,16 @@ def session_info() -> SessionInfo: "session_id": uuid4(), "user": { "user_id": anon_id, - "name": "Anonymous User", + "name": "Anonymous User", + "email": "anon@ymous.user", "token": Left("INVALID-TOKEN"), "logged_in": False }, "anon_id": anon_id, "user_agent": request.headers.get("User-Agent"), "ip_addr": request.environ.get("HTTP_X_FORWARDED_FOR", - request.remote_addr) + request.remote_addr), + "masquerading": None })) def set_user_token(token: str) -> SessionInfo: @@ -72,3 +76,27 @@ def set_user_details(userdets: UserDetails) -> SessionInfo: def user_token() -> Either: """Retrieve the user token.""" return session_info()["user"]["token"] + +def set_masquerading(masq_info): + """Save the masquerading user information.""" + orig_user = session_info()["user"] + return save_session_info({ + **session_info(), + "user": { + "user_id": UUID(masq_info["masquerade_as"]["user"]["user_id"]), + "name": masq_info["masquerade_as"]["user"]["name"], + "email": masq_info["masquerade_as"]["user"]["email"], + "token": Right(masq_info["masquerade_as"]["token"]), + "logged_in": True + }, + "masquerading": orig_user + }) + +def unset_masquerading(): + """Restore the original session.""" + the_session = session_info() + return save_session_info({ + **the_session, + "user": the_session["masquerading"], + "masquerading": None + }) diff --git a/wqflask/wqflask/oauth2/toplevel.py b/wqflask/wqflask/oauth2/toplevel.py index 04a08870..61d53dfe 100644 --- a/wqflask/wqflask/oauth2/toplevel.py +++ b/wqflask/wqflask/oauth2/toplevel.py @@ -32,6 +32,7 @@ def authorisation_code(): session.set_user_details({ "user_id": UUID(udets["user_id"]), "name": udets["name"], + "email": udets["email"], "token": session.user_token(), "logged_in": True }) diff --git a/wqflask/wqflask/oauth2/users.py b/wqflask/wqflask/oauth2/users.py index 597dfb33..51a4ca4c 100644 --- a/wqflask/wqflask/oauth2/users.py +++ b/wqflask/wqflask/oauth2/users.py @@ -7,11 +7,13 @@ from flask import ( flash, request, url_for, redirect, Response, Blueprint, current_app as app) +from . import client from . import session from .ui import render_ui from .checks import require_oauth2, user_logged_in from .client import oauth2_get, oauth2_post, oauth2_client -from .request_utils import user_details, request_error, process_error +from .request_utils import ( + user_details, request_error, process_error, with_flash_error) users = Blueprint("user", __name__) @@ -80,6 +82,7 @@ def login(): session.set_user_details({ "user_id": UUID(udets["user_id"]), "name": udets["name"], + "email": udets["email"], "token": session.user_token(), "logged_in": True }) @@ -102,10 +105,22 @@ def logout(): config = app.config resp = oauth2_client().revoke_token( urljoin(config["GN_SERVER_URL"], "oauth2/revoke")) - session.clear_session_info() - flash("Successfully logged out.", "alert-success") - - return redirect("/") + the_session = session.session_info() + if not bool(the_session["masquerading"]): + # Normal session - clear and go back. + session.clear_session_info() + flash("Successfully logged out.", "alert-success") + return redirect("/") + # Restore masquerading session + session.unset_masquerading() + flash( + "Successfully logged out as user " + f"{the_session['user']['name']} ({the_session['user']['email']}) " + "and restored session for user " + f"{the_session['masquerading']['name']} " + f"({the_session['masquerading']['email']})", + "alert-success") + return redirect("/") @users.route("/register", methods=["GET", "POST"]) def register_user(): @@ -139,3 +154,37 @@ def register_user(): flash("Registration successful! Please login to continue.", "alert-success") return redirect(url_for("oauth2.user.login")) + +@users.route("/masquerade", methods=["GET", "POST"]) +def masquerade(): + """Masquerade as a particular user.""" + if request.method == "GET": + this_user = session.session_info()["user"] + return client.get("oauth2/user/list").either( + lambda err: render_ui( + "oauth2/masquerade.html", users_error=process_error(err)), + lambda usrs: render_ui( + "oauth2/masquerade.html", users=tuple( + usr for usr in usrs + if UUID(usr["user_id"]) != this_user["user_id"]))) + + def __masq_success__(masq_details): + session.set_masquerading(masq_details) + flash( + f"User {masq_details['original']['user']['name']} " + f"({masq_details['original']['user']['email']}) is now " + "successfully masquerading as the user " + f"User {masq_details['masquerade_as']['user']['name']} " + f"({masq_details['masquerade_as']['user']['email']}) is now ", + "alert-success") + return redirect("/") + form = request.form + masquerade_as = form.get("masquerade_as").strip() + if not(bool(masquerade_as)): + flash("You must provide a user to masquerade as.", "alert-danger") + return redirect(url_for("oauth2.user.masquerade")) + return client.post( + "oauth2/user/masquerade/", + json={"masquerade_as": request.form.get("masquerade_as")}).either( + with_flash_error(redirect(url_for("oauth2.user.masquerade"))), + __masq_success__) diff --git a/wqflask/wqflask/templates/oauth2/masquerade.html b/wqflask/wqflask/templates/oauth2/masquerade.html new file mode 100644 index 00000000..48ec6cee --- /dev/null +++ b/wqflask/wqflask/templates/oauth2/masquerade.html @@ -0,0 +1,39 @@ +{%extends "base.html"%} +{%from "oauth2/profile_nav.html" import profile_nav%} +{%from "oauth2/display_error.html" import display_error%} +{%block title%}Masquerade As{%endblock%} +{%block content%} +