1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
"""Functions for acting on users."""
import uuid
from typing import Union
from functools import reduce
from datetime import datetime, timedelta
from ..roles.models import Role
from ..checks import authorised_p
from ..privileges import Privilege
from ...db import sqlite3 as db
from ...authentication.users import User
def __process_age_clause__(age_desc: str) -> tuple[str, int]:
"""Process the age clause and parameter for 'LIST USERS' query."""
_today = datetime.now()
_clause = "created"
_parts = age_desc.split(" ")
_multipliers = {
# Temporary hack before dateutil module can make it to our deployment.
"days": 1,
"months": 30,
"years": 365
}
assert len(_parts) in (3, 4), "Invalid age descriptor!"
_param = int((
_today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
).timestamp())
match _parts[0]:
case "older":
return "created < :created", _param
case "younger":
return "created > :created", _param
case "exactly":
return "created = :created", _param
case _:
raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised]
def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, Union[int, str]]]:
"""Process the WHERE clauses, and params for the 'LIST USERS' query."""
clauses = ""
params = {}
if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()):
clauses = "(email LIKE :email OR name LIKE :name)"
params = {
"email": f'%{kwargs["email"].strip()}%',
"name": f'%{kwargs["name"].strip()}%'
}
elif bool(kwargs.get("email", "").strip()):
clauses = "email LIKE :email"
params["email"] = f'%{kwargs["email"].strip()}%'
elif bool(kwargs.get("name", "").strip()):
clauses = "name LIKE :name"
params["name"] = f'%{kwargs["name"].strip()}%'
else:
clauses = ""
if bool(kwargs.get("verified", "").strip()):
clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified"
params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0"
if bool(kwargs.get("age", "").strip()):
_clause, _param = __process_age_clause__(kwargs["age"].strip())
clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause
params["created"] = _param
return clauses, params
@authorised_p(
("system:user:list",),
"You do not have the appropriate privileges to list users.",
oauth2_scope="profile user")
def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
"""List out all users."""
_query = "SELECT * FROM users"
_clauses, _params = __list_user_clauses_and_params__(**kwargs)
if len(_clauses) > 0:
_query = _query + " WHERE " + _clauses
with db.cursor(conn) as cursor:
cursor.execute(_query, _params)
return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())
def __build_resource_roles__(rows):
def __build_roles__(roles, row):
role_id = uuid.UUID(row["role_id"])
priv = Privilege(row["privilege_id"], row["privilege_description"])
role = roles.get(role_id, Role(
role_id, row["role_name"], bool(row["user_editable"]), tuple()))
return {
**roles,
role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,))
}
def __build__(acc, row):
resource_id = uuid.UUID(row["resource_id"])
return {
**acc,
resource_id: __build_roles__(acc.get(resource_id, {}), row)
}
return {
resource_id: tuple(roles.values())
for resource_id, roles in reduce(__build__, rows, {}).items()
}
# @authorised_p(
# ("",),
# ("You do not have the appropriate privileges to view a user's roles on "
# "resources."))
def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]:
"""Fetch all the user's roles on resources."""
with db.cursor(conn) as cursor:
cursor.execute(
"SELECT res.*, rls.*, p.*"
"FROM resources AS res INNER JOIN "
"user_roles AS ur "
"ON res.resource_id=ur.resource_id "
"LEFT JOIN roles AS rls "
"ON ur.role_id=rls.role_id "
"LEFT JOIN role_privileges AS rp "
"ON rls.role_id=rp.role_id "
"LEFT JOIN privileges AS p "
"ON rp.privilege_id=p.privilege_id "
"WHERE ur.user_id = ?",
(str(user.user_id),))
return __build_resource_roles__(
(dict(row) for row in cursor.fetchall()))
|