diff options
author | Frederick Muriuki Muriithi | 2025-07-29 09:09:40 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2025-07-29 09:09:40 -0500 |
commit | eda1ac9e8cb98421e3d12e6a3e51dc8384f19837 (patch) | |
tree | 36f4ce5a0c49d3d0707080e9cf9b6375ab7a3237 | |
parent | 4c78f2145bc62bb3bb83da3be226addc8e36e812 (diff) | |
download | gn-auth-eda1ac9e8cb98421e3d12e6a3e51dc8384f19837.tar.gz |
Add spec-based authorisation checker function.
-rw-r--r-- | gn_auth/auth/authorisation/resources/checks.py | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py index 5484dbf..c751c27 100644 --- a/gn_auth/auth/authorisation/resources/checks.py +++ b/gn_auth/auth/authorisation/resources/checks.py @@ -3,6 +3,8 @@ from uuid import UUID from functools import reduce from typing import Sequence +from gn_libs.privileges import check + from .base import Resource from ...db import sqlite3 as db @@ -82,3 +84,32 @@ def authorised_for2( str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges) return all((requested_privilege in str_privileges) for requested_privilege in privileges) + + +def authorised_for_spec( + conn: db.DbConnection, + user_id: uuid.UUID, + resource_id: uuid.UUID, + auth_spec: str +) -> bool: + """ + Check that a user, identified with `user_id`, has a set of privileges that + satisfy the `auth_spec` for the resource identified with `resource_id`. + """ + with db.cursor(conn) as cursor: + _query = ( + "SELECT resources.resource_id, user_roles.user_id, roles.role_id, " + "privileges.* " + "FROM resources INNER JOIN user_roles " + "ON resources.resource_id=user_roles.resource_id " + "INNER JOIN roles ON user_roles.role_id=roles.role_id " + "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id " + "INNER JOIN privileges " + "ON role_privileges.privilege_id=privileges.privilege_id " + "WHERE resources.resource_id=? " + "AND user_roles.user_id=?") + cursor.execute( + _query, + (str(resource.resource_id), str(user.user_id))) + _privileges = tuple(row["privilege_id"] for row in cursor.fetchall()) + return check(auth_spec, _privileges) |