diff options
Diffstat (limited to 'gn3/auth/authorisation/roles.py')
-rw-r--r-- | gn3/auth/authorisation/roles.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/gn3/auth/authorisation/roles.py b/gn3/auth/authorisation/roles.py index 397ad80..e71d427 100644 --- a/gn3/auth/authorisation/roles.py +++ b/gn3/auth/authorisation/roles.py @@ -1,8 +1,10 @@ """Handle management of roles""" from uuid import UUID, uuid4 +from functools import reduce from typing import Iterable, NamedTuple from gn3.auth import db +from gn3.auth.authentication.users import User from gn3.auth.authentication.checks import authenticated_p from .checks import authorised_p @@ -42,3 +44,48 @@ def create_role( for priv in privileges)) return role + +def __organise_privileges__(roles_dict, privilege_row): + """Organise the privileges into their roles.""" + role_id_str = privilege_row["role_id"] + if role_id_str in roles_dict: + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + roles_dict[role_id_str].privileges + ( + Privilege(UUID(privilege_row["privilege_id"]), + privilege_row["privilege_name"]),)) + } + + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + (Privilege(UUID(privilege_row["privilege_id"]), + privilege_row["privilege_name"]),)) + } + +def user_roles(conn: db.DbConnection, user: User): + """Retrieve ALL roles assigned to the user.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " + "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=? " + "UNION " + "SELECT r.*, p.* FROM group_user_roles_on_resources AS guror " + "INNER JOIN roles AS r ON guror.role_id=r.role_id " + "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE guror.user_id=?", + ((str(user.user_id),)*2)) + + results = cursor.fetchall() + if results: + return tuple( + reduce(__organise_privileges__, results, {}).values()) + return tuple() |