From 8b7c598407a5fea9a3d78473e72df87606998cd4 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 4 Aug 2023 10:10:28 +0300 Subject: Copy over files from GN3 repository. --- gn_auth/auth/authorisation/users/models.py | 66 ++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 gn_auth/auth/authorisation/users/models.py (limited to 'gn_auth/auth/authorisation/users/models.py') diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py new file mode 100644 index 0000000..89c1d22 --- /dev/null +++ b/gn_auth/auth/authorisation/users/models.py @@ -0,0 +1,66 @@ +"""Functions for acting on users.""" +import uuid +from functools import reduce + +from gn3.auth import db +from gn3.auth.authorisation.roles.models import Role +from gn3.auth.authorisation.checks import authorised_p +from gn3.auth.authorisation.privileges import Privilege + +from gn3.auth.authentication.users import User + +@authorised_p( + ("system:user:list",), + "You do not have the appropriate privileges to list users.", + oauth2_scope="profile user") +def list_users(conn: db.DbConnection) -> tuple[User, ...]: + """List out all users.""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM users") + return tuple( + User(uuid.UUID(row["user_id"]), row["email"], row["name"]) + for row in cursor.fetchall()) + +def __build_resource_roles__(rows): + def __build_roles__(roles, row): + role_id = uuid.UUID(row["role_id"]) + priv = Privilege(row["privilege_id"], row["privilege_description"]) + role = roles.get(role_id, Role( + role_id, row["role_name"], bool(row["user_editable"]), tuple())) + return { + **roles, + role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,)) + } + def __build__(acc, row): + resource_id = uuid.UUID(row["resource_id"]) + return { + **acc, + resource_id: __build_roles__(acc.get(resource_id, {}), row) + } + return { + resource_id: tuple(roles.values()) + for resource_id, roles in reduce(__build__, rows, {}).items() + } + +# @authorised_p( +# ("",), +# ("You do not have the appropriate privileges to view a user's roles on " +# "resources.")) +def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]: + """Fetch all the user's roles on resources.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT res.*, rls.*, p.*" + "FROM resources AS res INNER JOIN " + "group_user_roles_on_resources AS guror " + "ON res.resource_id=guror.resource_id " + "LEFT JOIN roles AS rls " + "ON guror.role_id=rls.role_id " + "LEFT JOIN role_privileges AS rp " + "ON rls.role_id=rp.role_id " + "LEFT JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id " + "WHERE guror.user_id = ?", + (str(user.user_id),)) + return __build_resource_roles__( + (dict(row) for row in cursor.fetchall())) -- cgit v1.2.3