diff options
author | Frederick Muriuki Muriithi | 2025-06-09 12:47:33 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2025-06-09 12:47:33 -0500 |
commit | cd36b02e3cbaadfa086b373798fb23fb8bebe8b1 (patch) | |
tree | f62d8fb2165747fe5c62210b76bb8a24568a465e | |
parent | 22b285becb3d46fe7272870558fbde56396188e2 (diff) | |
download | gn-auth-cd36b02e3cbaadfa086b373798fb23fb8bebe8b1.tar.gz |
Check for a user's privileges on a particular resource.
Add a general function to check that a particular user has certain
privileges on a particular resource.
-rw-r--r-- | gn_auth/auth/authorisation/resources/checks.py | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py index d8e3a9f..ca45a20 100644 --- a/gn_auth/auth/authorisation/resources/checks.py +++ b/gn_auth/auth/authorisation/resources/checks.py @@ -3,9 +3,13 @@ from uuid import UUID from functools import reduce from typing import Sequence +from .base import Resource + from ...db import sqlite3 as db from ...authentication.users import User +from ..privileges.models import db_row_to_privilege + def __organise_privileges_by_resource_id__(rows): def __organise__(privs, row): resource_id = UUID(row["resource_id"]) @@ -16,6 +20,7 @@ def __organise_privileges_by_resource_id__(rows): } return reduce(__organise__, rows, {}) + def authorised_for(conn: db.DbConnection, user: User, privileges: tuple[str, ...], @@ -45,3 +50,35 @@ def authorised_for(conn: db.DbConnection, resource_id: resource_id in authorised for resource_id in resource_ids } + + +def authorised_for2( + conn: db.DbConnection, + user: User, + resource: Resource, + privileges: tuple[str, ...] +) -> bool: + """ + Check that `user` has **ALL** the specified privileges for the resource. + """ + with db.cursor(conn) as cursor: + _query = ( + "SELECT resources.resource_id, user_roles.user_id, roles.role_id, " + "privileges.* " + "FROM resources INNER JOIN user_roles " + "ON resources.resource_id=user_roles.resource_id " + "INNER JOIN roles ON user_roles.role_id=roles.role_id " + "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id " + "INNER JOIN privileges " + "ON role_privileges.privilege_id=privileges.privilege_id " + f"WHERE resources.resource_id=? " + "AND user_roles.user_id=?") + cursor.execute( + _query, + (str(resource.resource_id), str(user.user_id))) + _db_privileges = tuple( + db_row_to_privilege(row) for row in cursor.fetchall()) + + str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges) + return all((requested_privilege in str_privileges) + for requested_privilege in privileges) |