about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-07-29 09:09:40 -0500
committerFrederick Muriuki Muriithi2025-07-29 09:09:40 -0500
commiteda1ac9e8cb98421e3d12e6a3e51dc8384f19837 (patch)
tree36f4ce5a0c49d3d0707080e9cf9b6375ab7a3237
parent4c78f2145bc62bb3bb83da3be226addc8e36e812 (diff)
downloadgn-auth-eda1ac9e8cb98421e3d12e6a3e51dc8384f19837.tar.gz
Add spec-based authorisation checker function.
-rw-r--r--gn_auth/auth/authorisation/resources/checks.py31
1 files changed, 31 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py
index 5484dbf..c751c27 100644
--- a/gn_auth/auth/authorisation/resources/checks.py
+++ b/gn_auth/auth/authorisation/resources/checks.py
@@ -3,6 +3,8 @@ from uuid import UUID
 from functools import reduce
 from typing import Sequence
 
+from gn_libs.privileges import check
+
 from .base import Resource
 
 from ...db import sqlite3 as db
@@ -82,3 +84,32 @@ def authorised_for2(
     str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
     return all((requested_privilege in str_privileges)
                for requested_privilege in privileges)
+
+
+def authorised_for_spec(
+        conn: db.DbConnection,
+        user_id: uuid.UUID,
+        resource_id: uuid.UUID,
+        auth_spec: str
+) -> bool:
+    """
+    Check that a user, identified with `user_id`, has a set of privileges that
+    satisfy the `auth_spec` for the resource identified with `resource_id`.
+    """
+    with db.cursor(conn) as cursor:
+        _query = (
+            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
+            "privileges.* "
+            "FROM resources INNER JOIN user_roles "
+            "ON resources.resource_id=user_roles.resource_id "
+            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
+            "INNER JOIN privileges "
+            "ON role_privileges.privilege_id=privileges.privilege_id "
+            "WHERE resources.resource_id=? "
+            "AND user_roles.user_id=?")
+        cursor.execute(
+            _query,
+            (str(resource.resource_id), str(user.user_id)))
+        _privileges = tuple(row["privilege_id"] for row in cursor.fetchall())
+    return check(auth_spec, _privileges)