1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
import requests
from uuid import UUID
from urllib.parse import urljoin
from authlib.integrations.base_client.errors import OAuthError
from flask import (
flash, request, url_for, redirect, Response, Blueprint,
current_app as app)
from . import client
from . import session
from .ui import render_ui
from .checks import require_oauth2, user_logged_in
from .client import oauth2_get, oauth2_post, oauth2_client
from .request_utils import (
user_details, request_error, process_error, with_flash_error)
users = Blueprint("user", __name__)
@users.route("/profile", methods=["GET"])
@require_oauth2
def user_profile():
__id__ = lambda the_val: the_val
usr_dets = user_details()
def __render__(usr_dets, roles=[], **kwargs):
return render_ui(
"oauth2/view-user.html", user_details=usr_dets, roles=roles,
user_privileges = tuple(
privilege["privilege_id"] for role in roles
for privilege in role["privileges"]),
**kwargs)
def __roles_success__(roles):
if bool(usr_dets.get("group")):
return __render__(usr_dets, roles)
return oauth2_get("auth/user/group/join-request").either(
lambda err: __render__(
user_details, group_join_error=process_error(err)),
lambda gjr: __render__(usr_dets, roles=roles, group_join_request=gjr))
return oauth2_get("auth/system/roles").either(
lambda err: __render__(usr_dets, role_error=process_error(err)),
__roles_success__)
@users.route("/request-add-to-group", methods=["POST"])
@require_oauth2
def request_add_to_group() -> Response:
"""Request to be added to a group."""
form = request.form
group_id = form["group"]
def __error__(error):
err = process_error(error)
flash(f"{err['error']}: {err['error_message']}", "alert-danger")
return redirect(url_for("oauth2.user.user_profile"))
def __success__(response):
flash(f"{response['message']} (Response ID: {response['request_id']})",
"alert-success")
return redirect(url_for("oauth2.user.user_profile"))
return oauth2_post(f"auth/group/requests/join/{group_id}",
data=form).either(__error__, __success__)
@users.route("/login", methods=["GET", "POST"])
def login():
"""Route to allow users to sign up."""
from gn2.utility.tools import AUTH_SERVER_URL
next_endpoint=request.args.get("next", False)
if request.method == "POST":
form = request.form
client = oauth2_client()
try:
token = client.fetch_token(
urljoin(AUTH_SERVER_URL, "auth/token"),
username=form.get("email_address"),
password=form.get("password"),
grant_type="password")
session.set_user_token(token)
udets = user_details()
session.set_user_details({
"user_id": UUID(udets["user_id"]),
"name": udets["name"],
"email": udets["email"],
"token": session.user_token(),
"logged_in": True
})
except OAuthError as _oaerr:
flash(_oaerr.args[0], "alert-danger")
return render_ui(
"oauth2/login.html", next_endpoint=next_endpoint,
email=form.get("email_address"))
if user_logged_in():
if next_endpoint:
return redirect(url_for(next_endpoint))
return redirect("/")
return render_ui("oauth2/login.html", next_endpoint=next_endpoint)
@users.route("/logout", methods=["GET", "POST"])
def logout():
from gn2.utility.tools import AUTH_SERVER_URL
if user_logged_in():
resp = oauth2_client().revoke_token(
urljoin(AUTH_SERVER_URL, "auth/revoke"))
the_session = session.session_info()
if not bool(the_session["masquerading"]):
# Normal session - clear and go back.
session.clear_session_info()
flash("Successfully logged out.", "alert-success")
return redirect("/")
# Restore masquerading session
session.unset_masquerading()
flash(
"Successfully logged out as user "
f"{the_session['user']['name']} ({the_session['user']['email']}) "
"and restored session for user "
f"{the_session['masquerading']['name']} "
f"({the_session['masquerading']['email']})",
"alert-success")
return redirect("/")
@users.route("/register", methods=["GET", "POST"])
def register_user():
from gn2.utility.tools import AUTH_SERVER_URL
if user_logged_in():
next_endpoint=request.args.get("next", "/")
flash(("You cannot register a new user while logged in. "
"Please logout to register a new user."),
"alert-danger")
return redirect(next_endpoint)
if request.method == "GET":
return render_ui("oauth2/register_user.html")
form = request.form
response = requests.post(
urljoin(AUTH_SERVER_URL, "auth/user/register"),
data = {
"user_name": form.get("user_name"),
"email": form.get("email_address"),
"password": form.get("password"),
"confirm_password": form.get("confirm_password")})
results = response.json()
if "error" in results:
error_messages = tuple(
f"{results['error']}: {msg.strip()}"
for msg in results.get("error_description").split("::"))
for message in error_messages:
flash(message, "alert-danger")
return redirect(url_for("oauth2.user.register_user"))
flash("Registration successful! Please login to continue.", "alert-success")
return redirect(url_for("oauth2.user.login"))
@users.route("/masquerade", methods=["GET", "POST"])
def masquerade():
"""Masquerade as a particular user."""
if request.method == "GET":
this_user = session.session_info()["user"]
return client.get("auth/user/list").either(
lambda err: render_ui(
"oauth2/masquerade.html", users_error=process_error(err)),
lambda usrs: render_ui(
"oauth2/masquerade.html", users=tuple(
usr for usr in usrs
if UUID(usr["user_id"]) != this_user["user_id"])))
def __masq_success__(masq_details):
session.set_masquerading(masq_details)
flash(
f"User {masq_details['original']['user']['name']} "
f"({masq_details['original']['user']['email']}) is now "
"successfully masquerading as the user "
f"User {masq_details['masquerade_as']['user']['name']} "
f"({masq_details['masquerade_as']['user']['email']}) is now ",
"alert-success")
return redirect("/")
form = request.form
masquerade_as = form.get("masquerade_as").strip()
if not(bool(masquerade_as)):
flash("You must provide a user to masquerade as.", "alert-danger")
return redirect(url_for("oauth2.user.masquerade"))
return client.post(
"auth/user/masquerade/",
json={"masquerade_as": request.form.get("masquerade_as")}).either(
with_flash_error(redirect(url_for("oauth2.user.masquerade"))),
__masq_success__)
|