From 8b7c598407a5fea9a3d78473e72df87606998cd4 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 4 Aug 2023 10:10:28 +0300 Subject: Copy over files from GN3 repository. --- gn_auth/auth/authorisation/resources/checks.py | 47 ++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 gn_auth/auth/authorisation/resources/checks.py (limited to 'gn_auth/auth/authorisation/resources/checks.py') diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py new file mode 100644 index 0000000..fafde76 --- /dev/null +++ b/gn_auth/auth/authorisation/resources/checks.py @@ -0,0 +1,47 @@ +"""Handle authorisation checks for resources""" +from uuid import UUID +from functools import reduce +from typing import Sequence + +from gn3.auth import db +from gn3.auth.authentication.users import User + +def __organise_privileges_by_resource_id__(rows): + def __organise__(privs, row): + resource_id = UUID(row["resource_id"]) + return { + **privs, + resource_id: (row["privilege_id"],) + privs.get( + resource_id, tuple()) + } + return reduce(__organise__, rows, {}) + +def authorised_for(conn: db.DbConnection, user: User, privileges: tuple[str], + resource_ids: Sequence[UUID]) -> dict[UUID, bool]: + """ + Check whether `user` is authorised to access `resources` according to given + `privileges`. + """ + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT guror.*, rp.privilege_id FROM " + "group_user_roles_on_resources AS guror " + "INNER JOIN group_roles AS gr ON " + "(guror.group_id=gr.group_id AND guror.role_id=gr.role_id) " + "INNER JOIN roles AS r ON gr.role_id=r.role_id " + "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " + "WHERE guror.user_id=? " + f"AND guror.resource_id IN ({', '.join(['?']*len(resource_ids))})" + f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"), + ((str(user.user_id),) + tuple( + str(r_id) for r_id in resource_ids) + tuple(privileges))) + resource_privileges = __organise_privileges_by_resource_id__( + cursor.fetchall()) + authorised = tuple(resource_id for resource_id, res_privileges + in resource_privileges.items() + if all(priv in res_privileges + for priv in privileges)) + return { + resource_id: resource_id in authorised + for resource_id in resource_ids + } -- cgit v1.2.3