about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn_auth/auth/authorisation/resources/checks.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py
index d8e3a9f..ca45a20 100644
--- a/gn_auth/auth/authorisation/resources/checks.py
+++ b/gn_auth/auth/authorisation/resources/checks.py
@@ -3,9 +3,13 @@ from uuid import UUID
 from functools import reduce
 from typing import Sequence
 
+from .base import Resource
+
 from ...db import sqlite3 as db
 from ...authentication.users import User
 
+from ..privileges.models import db_row_to_privilege
+
 def __organise_privileges_by_resource_id__(rows):
     def __organise__(privs, row):
         resource_id = UUID(row["resource_id"])
@@ -16,6 +20,7 @@ def __organise_privileges_by_resource_id__(rows):
         }
     return reduce(__organise__, rows, {})
 
+
 def authorised_for(conn: db.DbConnection,
                    user: User,
                    privileges: tuple[str, ...],
@@ -45,3 +50,35 @@ def authorised_for(conn: db.DbConnection,
             resource_id: resource_id in authorised
             for resource_id in resource_ids
         }
+
+
+def authorised_for2(
+        conn: db.DbConnection,
+        user: User,
+        resource: Resource,
+        privileges: tuple[str, ...]
+) -> bool:
+    """
+    Check that `user` has **ALL** the specified privileges for the resource.
+    """
+    with db.cursor(conn) as cursor:
+        _query = (
+            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
+            "privileges.* "
+            "FROM resources INNER JOIN user_roles "
+            "ON resources.resource_id=user_roles.resource_id "
+            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
+            "INNER JOIN privileges "
+            "ON role_privileges.privilege_id=privileges.privilege_id "
+            f"WHERE resources.resource_id=? "
+            "AND user_roles.user_id=?")
+        cursor.execute(
+            _query,
+            (str(resource.resource_id), str(user.user_id)))
+        _db_privileges = tuple(
+            db_row_to_privilege(row) for row in cursor.fetchall())
+
+    str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
+    return all((requested_privilege in str_privileges)
+               for requested_privilege in privileges)