1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
"""Endpoints for the authorisation stuff."""
import uuid
import traceback
from typing import Tuple, Optional
import sqlite3
from flask import request, jsonify, Response, current_app
from gn3.auth import db
from gn3.auth.dictify import dictify
from gn3.auth.blueprint import oauth2
from .errors import NotFoundError, UserRegistrationError
from .resources import user_resources as _user_resources
from .roles import user_role, assign_default_roles, user_roles as _user_roles
from .groups import (
all_groups, GroupCreationError, user_group as _user_group,
group_users as _group_users, create_group as _create_group)
from ..authentication.oauth2.resource_server import require_oauth
from ..authentication.users import save_user, set_user_password
from ..authentication.oauth2.models.oauth2token import token_by_access_token
@oauth2.route("/user", methods=["GET"])
@require_oauth("profile")
def user_details():
"""Return user's details."""
with require_oauth.acquire("profile") as the_token:
user = the_token.user
user_dets = {
"user_id": user.user_id, "email": user.email, "name": user.name,
"group": False
}
with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
return jsonify(_user_group(cursor, user).maybe(
user_dets,
lambda group: {**user_dets, "group": dictify(group)}))
@oauth2.route("/user-roles", methods=["GET"])
@require_oauth("role")
def user_roles():
"""Return the non-resource roles assigned to the user."""
with require_oauth.acquire("role") as token:
with db.connection(current_app.config["AUTH_DB"]) as conn:
return jsonify(_user_roles(conn, token.user).maybe(
tuple(), lambda roles: tuple(dictify(role) for role in roles)))
def __email_valid__(email: str) -> Tuple[bool, Optional[str]]:
"""Validate the email address."""
if email == "":
return False, "Empty email address"
## Check that the address is a valid email address
## Review use of `email-validator` or `pyIsEmail` python packages for
## validating the emails, if it turns out this is important.
## Success
return True, None
def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]:
if password == "" or confirm_password == "":
return False, "Empty password value"
if password != confirm_password:
return False, "Mismatched password values"
return True, None
def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]:
if name == "":
return False, "User's name not provided."
return True, None
def __assert_not_logged_in__(conn: db.DbConnection):
bearer = request.headers.get('Authorization')
if bearer:
token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc]
False, lambda tok: tok)
if token:
raise UserRegistrationError(
"Cannot register user while authenticated")
@oauth2.route("/register-user", methods=["POST"])
def register_user():
"""Register a user."""
with db.connection(current_app.config["AUTH_DB"]) as conn:
__assert_not_logged_in__(conn)
form = request.form
email = form.get("email", "").strip()
password = form.get("password", "").strip()
user_name = form.get("user_name", "").strip()
errors = tuple(
error for valid,error in
[__email_valid__(email),
__password_valid__(
password, form.get("confirm_password", "").strip()),
__user_name_valid__(user_name)]
if not valid)
if len(errors) > 0:
raise UserRegistrationError(*errors)
try:
with db.cursor(conn) as cursor:
user, _hashed_password = set_user_password(
cursor, save_user(cursor, email, user_name), password)
assign_default_roles(cursor, user)
return jsonify(
{
"user_id": user.user_id,
"email": user.email,
"name": user.name
}), 200
except sqlite3.IntegrityError as sq3ie:
current_app.logger.debug(traceback.format_exc())
raise UserRegistrationError(
"A user with that email already exists") from sq3ie
raise Exception(
"unknown_error", "The system experienced an unexpected error.")
@oauth2.route("/groups", methods=["GET"])
@require_oauth("profile group")
def groups():
"""Return the list of groups that exist."""
with db.connection(current_app.config["AUTH_DB"]) as conn:
the_groups = all_groups(conn)
return jsonify(the_groups.maybe(
[], lambda grps: [dictify(grp) for grp in grps]))
@oauth2.route("/create-group", methods=["POST"])
@require_oauth("profile group")
def create_group():
"""Create a new group."""
with require_oauth.acquire("profile group") as the_token:
group_name=request.form.get("group_name", "").strip()
if not bool(group_name):
raise GroupCreationError("Could not create the group.")
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn:
user = the_token.user
new_group = _create_group(
conn, group_name, user, request.form.get("group_description"))
return jsonify({
**dictify(new_group), "group_leader": dictify(user)
})
@oauth2.route("/role/<uuid:role_id>", methods=["GET"])
@require_oauth("role")
def role(role_id: uuid.UUID) -> Response:
"""Retrieve a user role with id `role_id`"""
def __error__(exc: Exception):
raise exc
with require_oauth.acquire("profile role") as the_token:
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn:
the_role = user_role(conn, the_token.user, role_id)
return the_role.either(
__error__, lambda a_role: jsonify(dictify(a_role)))
@oauth2.route("/user-group", methods=["GET"])
@require_oauth("profile group")
def user_group():
"""Retrieve the group in which the user is a member."""
with require_oauth.acquire("profile group") as the_token:
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn, db.cursor(conn) as cursor:
group = _user_group(cursor, the_token.user).maybe(
False, lambda grp: grp)
if group:
return jsonify(dictify(group))
raise NotFoundError("User is not a member of any group.")
@oauth2.route("/user-resources")
@require_oauth("profile resource")
def user_resources():
"""Retrieve the resources a user has access to."""
with require_oauth.acquire("profile resource") as the_token:
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn:
return jsonify([
dictify(resource) for resource in
_user_resources(conn, the_token.user)])
@oauth2.route("/group-users/<uuid:group_id>", methods=["GET"])
@require_oauth("profile group")
def group_users(group_id: uuid.UUID) -> Response:
"""Retrieve all the members of a group."""
with require_oauth.acquire("profile group") as the_token:
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn:
return jsonify(tuple(
dictify(user) for user in _group_users(conn, group_id)))
|