aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/roles
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-09-15 08:38:47 +0300
committerFrederick Muriuki Muriithi2023-09-26 03:44:31 +0300
commitf6566c76d97cb44d47cc491f13e1342f0c2555cf (patch)
treed46f8d05cfd78ddda1f0549e65dc07ae233d2f7e /gn_auth/auth/authorisation/roles
parente19b01571ce61e01f482a1dadeeb2fd835fda939 (diff)
downloadgn-auth-f6566c76d97cb44d47cc491f13e1342f0c2555cf.tar.gz
Update `user_roles`: Return roles for user by resource.
Diffstat (limited to 'gn_auth/auth/authorisation/roles')
-rw-r--r--gn_auth/auth/authorisation/roles/models.py68
1 files changed, 38 insertions, 30 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py
index 579c9dc..206b05e 100644
--- a/gn_auth/auth/authorisation/roles/models.py
+++ b/gn_auth/auth/authorisation/roles/models.py
@@ -64,43 +64,51 @@ def create_role(
return role
-def __organise_privileges__(roles_dict, privilege_row):
- """Organise the privileges into their roles."""
- role_id_str = privilege_row["role_id"]
- if role_id_str in roles_dict:
- return {
- **roles_dict,
- role_id_str: Role(
- UUID(role_id_str),
- privilege_row["role_name"],
- bool(int(privilege_row["user_editable"])),
- roles_dict[role_id_str].privileges + (
- Privilege(privilege_row["privilege_id"],
- privilege_row["privilege_description"]),))
- }
-
+def __organise_privileges__(resources, row) -> dict:
+ resource_id = UUID(row["resource_id"])
+ role_id = UUID(row["role_id"])
+ roles = resources.get(resource_id, {}).get("roles", {})
+ role = roles.get(role_id, Role(
+ role_id,
+ row["role_name"],
+ bool(int(row["user_editable"])),
+ tuple()))
return {
- **roles_dict,
- role_id_str: Role(
- UUID(role_id_str),
- privilege_row["role_name"],
- bool(int(privilege_row["user_editable"])),
- (Privilege(privilege_row["privilege_id"],
- privilege_row["privilege_description"]),))
+ **resources,
+ resource_id: {
+ "resource_id": resource_id,
+ "user_id": UUID(row["user_id"]),
+ "roles": {
+ **roles,
+ role_id: Role(
+ role.role_id,
+ role.role_name,
+ role.user_editable,
+ role.privileges + (Privilege(
+ row["privilege_id"],
+ row["privilege_description"]),)
+ )
+ }
+ }
}
-def user_roles(conn: db.DbConnection, user: User) -> Sequence[Role]:
- """Retrieve non-resource roles assigned to the user."""
+def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]:
+ """Retrieve all roles (organised by resource) assigned to the user."""
with db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM user_roles")
cursor.execute(
- "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
- "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
- "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
- "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?",
+ "SELECT ur.resource_id, ur.user_id, r.*, p.* "
+ "FROM user_roles AS ur "
+ "INNER JOIN roles AS r ON ur.role_id=r.role_id "
+ "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+ "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+ "WHERE ur.user_id=?",
(str(user.user_id),))
- return tuple(
- reduce(__organise_privileges__, cursor.fetchall(), {}).values())
+ return tuple({
+ **row, "roles": tuple(row["roles"].values())
+ } for row in reduce(
+ __organise_privileges__, cursor.fetchall(), {}).values())
return tuple()
def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: