aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/roles/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/roles/models.py')
-rw-r--r--gn_auth/auth/authorisation/roles/models.py68
1 files changed, 38 insertions, 30 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py
index 579c9dc..206b05e 100644
--- a/gn_auth/auth/authorisation/roles/models.py
+++ b/gn_auth/auth/authorisation/roles/models.py
@@ -64,43 +64,51 @@ def create_role(
return role
-def __organise_privileges__(roles_dict, privilege_row):
- """Organise the privileges into their roles."""
- role_id_str = privilege_row["role_id"]
- if role_id_str in roles_dict:
- return {
- **roles_dict,
- role_id_str: Role(
- UUID(role_id_str),
- privilege_row["role_name"],
- bool(int(privilege_row["user_editable"])),
- roles_dict[role_id_str].privileges + (
- Privilege(privilege_row["privilege_id"],
- privilege_row["privilege_description"]),))
- }
-
+def __organise_privileges__(resources, row) -> dict:
+ resource_id = UUID(row["resource_id"])
+ role_id = UUID(row["role_id"])
+ roles = resources.get(resource_id, {}).get("roles", {})
+ role = roles.get(role_id, Role(
+ role_id,
+ row["role_name"],
+ bool(int(row["user_editable"])),
+ tuple()))
return {
- **roles_dict,
- role_id_str: Role(
- UUID(role_id_str),
- privilege_row["role_name"],
- bool(int(privilege_row["user_editable"])),
- (Privilege(privilege_row["privilege_id"],
- privilege_row["privilege_description"]),))
+ **resources,
+ resource_id: {
+ "resource_id": resource_id,
+ "user_id": UUID(row["user_id"]),
+ "roles": {
+ **roles,
+ role_id: Role(
+ role.role_id,
+ role.role_name,
+ role.user_editable,
+ role.privileges + (Privilege(
+ row["privilege_id"],
+ row["privilege_description"]),)
+ )
+ }
+ }
}
-def user_roles(conn: db.DbConnection, user: User) -> Sequence[Role]:
- """Retrieve non-resource roles assigned to the user."""
+def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]:
+ """Retrieve all roles (organised by resource) assigned to the user."""
with db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM user_roles")
cursor.execute(
- "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
- "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
- "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
- "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?",
+ "SELECT ur.resource_id, ur.user_id, r.*, p.* "
+ "FROM user_roles AS ur "
+ "INNER JOIN roles AS r ON ur.role_id=r.role_id "
+ "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+ "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+ "WHERE ur.user_id=?",
(str(user.user_id),))
- return tuple(
- reduce(__organise_privileges__, cursor.fetchall(), {}).values())
+ return tuple({
+ **row, "roles": tuple(row["roles"].values())
+ } for row in reduce(
+ __organise_privileges__, cursor.fetchall(), {}).values())
return tuple()
def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: