From f6566c76d97cb44d47cc491f13e1342f0c2555cf Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 15 Sep 2023 08:38:47 +0300 Subject: Update `user_roles`: Return roles for user by resource. --- gn_auth/auth/authorisation/roles/models.py | 68 +++++++++++++++++------------- 1 file changed, 38 insertions(+), 30 deletions(-) (limited to 'gn_auth/auth/authorisation/roles') diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py index 579c9dc..206b05e 100644 --- a/gn_auth/auth/authorisation/roles/models.py +++ b/gn_auth/auth/authorisation/roles/models.py @@ -64,43 +64,51 @@ def create_role( return role -def __organise_privileges__(roles_dict, privilege_row): - """Organise the privileges into their roles.""" - role_id_str = privilege_row["role_id"] - if role_id_str in roles_dict: - return { - **roles_dict, - role_id_str: Role( - UUID(role_id_str), - privilege_row["role_name"], - bool(int(privilege_row["user_editable"])), - roles_dict[role_id_str].privileges + ( - Privilege(privilege_row["privilege_id"], - privilege_row["privilege_description"]),)) - } - +def __organise_privileges__(resources, row) -> dict: + resource_id = UUID(row["resource_id"]) + role_id = UUID(row["role_id"]) + roles = resources.get(resource_id, {}).get("roles", {}) + role = roles.get(role_id, Role( + role_id, + row["role_name"], + bool(int(row["user_editable"])), + tuple())) return { - **roles_dict, - role_id_str: Role( - UUID(role_id_str), - privilege_row["role_name"], - bool(int(privilege_row["user_editable"])), - (Privilege(privilege_row["privilege_id"], - privilege_row["privilege_description"]),)) + **resources, + resource_id: { + "resource_id": resource_id, + "user_id": UUID(row["user_id"]), + "roles": { + **roles, + role_id: Role( + role.role_id, + role.role_name, + role.user_editable, + role.privileges + (Privilege( + row["privilege_id"], + row["privilege_description"]),) + ) + } + } } -def user_roles(conn: db.DbConnection, user: User) -> Sequence[Role]: - """Retrieve non-resource roles assigned to the user.""" +def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]: + """Retrieve all roles (organised by resource) assigned to the user.""" with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM user_roles") cursor.execute( - "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " - "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " - "ON r.role_id=rp.role_id INNER JOIN privileges AS p " - "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?", + "SELECT ur.resource_id, ur.user_id, r.*, p.* " + "FROM user_roles AS ur " + "INNER JOIN roles AS r ON ur.role_id=r.role_id " + "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=?", (str(user.user_id),)) - return tuple( - reduce(__organise_privileges__, cursor.fetchall(), {}).values()) + return tuple({ + **row, "roles": tuple(row["roles"].values()) + } for row in reduce( + __organise_privileges__, cursor.fetchall(), {}).values()) return tuple() def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: -- cgit v1.2.3