1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
"""User authorisation endpoints."""
import traceback
from typing import Any
from functools import partial
import sqlite3
from email_validator import validate_email, EmailNotValidError
from flask import request, jsonify, Response, Blueprint, current_app
from gn3.auth import db
from gn3.auth.dictify import dictify
from gn3.auth.db_utils import with_db_connection
from .models import list_users
from .masquerade.views import masq
from .collections.views import collections
from ..groups.models import user_group as _user_group
from ..resources.models import user_resources as _user_resources
from ..roles.models import assign_default_roles, user_roles as _user_roles
from ..errors import (
NotFoundError, UsernameError, PasswordError, UserRegistrationError)
from ...authentication.oauth2.resource_server import require_oauth
from ...authentication.users import User, save_user, set_user_password
from ...authentication.oauth2.models.oauth2token import token_by_access_token
users = Blueprint("users", __name__)
users.register_blueprint(masq, url_prefix="/masquerade")
users.register_blueprint(collections, url_prefix="/collections")
@users.route("/", methods=["GET"])
@require_oauth("profile")
def user_details() -> Response:
"""Return user's details."""
with require_oauth.acquire("profile") as the_token:
user = the_token.user
user_dets = {
"user_id": user.user_id, "email": user.email, "name": user.name,
"group": False
}
with db.connection(current_app.config["AUTH_DB"]) as conn:
the_group = _user_group(conn, user).maybe(# type: ignore[misc]
False, lambda grp: grp)# type: ignore[arg-type]
return jsonify({
**user_dets,
"group": dictify(the_group) if the_group else False
})
@users.route("/roles", methods=["GET"])
@require_oauth("role")
def user_roles() -> Response:
"""Return the non-resource roles assigned to the user."""
with require_oauth.acquire("role") as token:
with db.connection(current_app.config["AUTH_DB"]) as conn:
return jsonify(tuple(
dictify(role) for role in _user_roles(conn, token.user)))
def validate_password(password, confirm_password) -> str:
"""Validate the provided password."""
if len(password) < 8:
raise PasswordError("The password must be at least 8 characters long.")
if password != confirm_password:
raise PasswordError("Mismatched password values")
return password
def validate_username(name: str) -> str:
"""Validate the provides name."""
if name == "":
raise UsernameError("User's name not provided.")
return name
def __assert_not_logged_in__(conn: db.DbConnection):
bearer = request.headers.get('Authorization')
if bearer:
token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc]
False, lambda tok: tok)
if token:
raise UserRegistrationError(
"Cannot register user while authenticated")
@users.route("/register", methods=["POST"])
def register_user() -> Response:
"""Register a user."""
with db.connection(current_app.config["AUTH_DB"]) as conn:
__assert_not_logged_in__(conn)
try:
form = request.form
email = validate_email(form.get("email", "").strip(),
check_deliverability=True)
password = validate_password(
form.get("password", "").strip(),
form.get("confirm_password", "").strip())
user_name = validate_username(form.get("user_name", "").strip())
with db.cursor(conn) as cursor:
user, _hashed_password = set_user_password(
cursor, save_user(
cursor, email["email"], user_name), password)
assign_default_roles(cursor, user)
return jsonify(
{
"user_id": user.user_id,
"email": user.email,
"name": user.name
})
except sqlite3.IntegrityError as sq3ie:
current_app.logger.debug(traceback.format_exc())
raise UserRegistrationError(
"A user with that email already exists") from sq3ie
except EmailNotValidError as enve:
current_app.logger.debug(traceback.format_exc())
raise(UserRegistrationError(f"Email Error: {str(enve)}")) from enve
raise Exception(
"unknown_error", "The system experienced an unexpected error.")
@users.route("/group", methods=["GET"])
@require_oauth("profile group")
def user_group() -> Response:
"""Retrieve the group in which the user is a member."""
with require_oauth.acquire("profile group") as the_token:
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn:
group = _user_group(conn, the_token.user).maybe(# type: ignore[misc]
False, lambda grp: grp)# type: ignore[arg-type]
if group:
return jsonify(dictify(group))
raise NotFoundError("User is not a member of any group.")
@users.route("/resources", methods=["GET"])
@require_oauth("profile resource")
def user_resources() -> Response:
"""Retrieve the resources a user has access to."""
with require_oauth.acquire("profile resource") as the_token:
db_uri = current_app.config["AUTH_DB"]
with db.connection(db_uri) as conn:
return jsonify([
dictify(resource) for resource in
_user_resources(conn, the_token.user)])
@users.route("group/join-request", methods=["GET"])
@require_oauth("profile group")
def user_join_request_exists():
"""Check whether a user has an active group join request."""
def __request_exists__(conn: db.DbConnection, user: User) -> dict[str, Any]:
with db.cursor(conn) as cursor:
cursor.execute(
"SELECT * FROM group_join_requests WHERE requester_id=? AND "
"status = 'PENDING'",
(str(user.user_id),))
res = cursor.fetchone()
if res:
return {
"request_id": res["request_id"],
"exists": True
}
return{
"status": "Not found",
"exists": False
}
with require_oauth.acquire("profile group") as the_token:
return jsonify(with_db_connection(partial(
__request_exists__, user=the_token.user)))
@users.route("/list", methods=["GET"])
@require_oauth("profile user")
def list_all_users() -> Response:
"""List all the users."""
with require_oauth.acquire("profile group") as _the_token:
return jsonify(tuple(
dictify(user) for user in with_db_connection(list_users)))
|