1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
"""Utilities common to more than one resource."""
import uuid
from gn_auth.auth.db import sqlite3 as db
def assign_resource_owner_role(
cursor: db.DbCursor,
resource_id: uuid.UUID,
user_id: uuid.UUID
) -> dict:
"""Assign `user` the 'Resource Owner' role for `resource`."""
cursor.execute("SELECT * FROM roles WHERE role_name='resource-owner'")
role = cursor.fetchone()
params = {
"user_id": str(user_id),
"role_id": role["role_id"],
"resource_id": str(resource_id)
}
cursor.execute(
"INSERT INTO user_roles "
"VALUES (:user_id, :role_id, :resource_id) "
"ON CONFLICT (user_id, role_id, resource_id) DO NOTHING",
params)
return params
def grant_access_to_sysadmins(
cursor: db.DbCursor,
resource_id: uuid.UUID,
system_resource_id: uuid.UUID
):
"""Grant sysadmins access to resource identified by `resource_id`."""
cursor.execute(
"SELECT role_id FROM roles WHERE role_name='system-administrator'")
sysadminroleid = cursor.fetchone()[0]
cursor.execute(# Fetch sysadmin IDs.
"SELECT user_roles.user_id FROM roles INNER JOIN user_roles "
"ON roles.role_id=user_roles.role_id "
"WHERE role_name='system-administrator' AND resource_id=?",
(str(system_resource_id),))
cursor.executemany(
"INSERT INTO user_roles(user_id, role_id, resource_id) "
"VALUES (?, ?, ?) "
"ON CONFLICT (user_id, role_id, resource_id) DO NOTHING",
tuple((row["user_id"], sysadminroleid, str(resource_id))
for row in cursor.fetchall()))
|