1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
"""Handle authorisation checks for resources"""
import uuid
import warnings
from functools import reduce
from typing import Sequence
from gn_libs.privileges import check
from .base import Resource
from ...db import sqlite3 as db
from ...authentication.users import User
from ..privileges.models import db_row_to_privilege
def __organise_privileges_by_resource_id__(rows):
def __organise__(privs, row):
resource_id = uuid.UUID(row["resource_id"])
return {
**privs,
resource_id: (row["privilege_id"],) + privs.get(
resource_id, tuple())
}
return reduce(__organise__, rows, {})
def authorised_for(conn: db.DbConnection,
user: User,
privileges: tuple[str, ...],
resource_ids: Sequence[uuid.UUID]) -> dict[uuid.UUID, bool]:
"""
Check whether `user` is authorised to access `resources` according to given
`privileges`.
"""
warnings.warn(DeprecationWarning(
f"The function `{__name__}.authorised_for` is deprecated. Please use "
f"`{__name__}.authorised_for_spec`"))
with db.cursor(conn) as cursor:
cursor.execute(
("SELECT ur.*, rp.privilege_id FROM "
"user_roles AS ur "
"INNER JOIN roles AS r ON ur.role_id=r.role_id "
"INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
"WHERE ur.user_id=? "
f"AND ur.resource_id IN ({', '.join(['?']*len(resource_ids))})"
f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"),
((str(user.user_id),) + tuple(
str(r_id) for r_id in resource_ids) + tuple(privileges)))
resource_privileges = __organise_privileges_by_resource_id__(
cursor.fetchall())
authorised = tuple(resource_id for resource_id, res_privileges
in resource_privileges.items()
if all(priv in res_privileges
for priv in privileges))
return {
resource_id: resource_id in authorised
for resource_id in resource_ids
}
def authorised_for2(
conn: db.DbConnection,
user: User,
resource: Resource,
privileges: tuple[str, ...]
) -> bool:
"""
Check that `user` has **ALL** the specified privileges for the resource.
"""
warnings.warn(DeprecationWarning(
f"The function `{__name__}.authorised_for2` is deprecated. Please use "
f"`{__name__}.authorised_for_spec`"))
with db.cursor(conn) as cursor:
_query = (
"SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
"privileges.* "
"FROM resources INNER JOIN user_roles "
"ON resources.resource_id=user_roles.resource_id "
"INNER JOIN roles ON user_roles.role_id=roles.role_id "
"INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
"INNER JOIN privileges "
"ON role_privileges.privilege_id=privileges.privilege_id "
"WHERE resources.resource_id=? "
"AND user_roles.user_id=?")
cursor.execute(
_query,
(str(resource.resource_id), str(user.user_id)))
_db_privileges = tuple(
db_row_to_privilege(row) for row in cursor.fetchall())
str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
return all((requested_privilege in str_privileges)
for requested_privilege in privileges)
def authorised_for_spec(
conn: db.DbConnection,
user_id: uuid.UUID,
resource_id: uuid.UUID,
auth_spec: str
) -> bool:
"""
Check that a user, identified with `user_id`, has a set of privileges that
satisfy the `auth_spec` for the resource identified with `resource_id`.
"""
with db.cursor(conn) as cursor:
_query = (
"SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
"privileges.* "
"FROM resources INNER JOIN user_roles "
"ON resources.resource_id=user_roles.resource_id "
"INNER JOIN roles ON user_roles.role_id=roles.role_id "
"INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
"INNER JOIN privileges "
"ON role_privileges.privilege_id=privileges.privilege_id "
"WHERE resources.resource_id=? "
"AND user_roles.user_id=?")
cursor.execute(
_query,
(str(resource_id), str(user_id)))
_privileges = tuple(row["privilege_id"] for row in cursor.fetchall())
return check(auth_spec, _privileges)
|