about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/resources/checks.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/resources/checks.py')
-rw-r--r--gn_auth/auth/authorisation/resources/checks.py81
1 files changed, 78 insertions, 3 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py
index d8e3a9f..ce2b821 100644
--- a/gn_auth/auth/authorisation/resources/checks.py
+++ b/gn_auth/auth/authorisation/resources/checks.py
@@ -1,14 +1,21 @@
 """Handle authorisation checks for resources"""
-from uuid import UUID
+import uuid
+import warnings
 from functools import reduce
 from typing import Sequence
 
+from gn_libs.privileges import check
+
+from .base import Resource
+
 from ...db import sqlite3 as db
 from ...authentication.users import User
 
+from ..privileges.models import db_row_to_privilege
+
 def __organise_privileges_by_resource_id__(rows):
     def __organise__(privs, row):
-        resource_id = UUID(row["resource_id"])
+        resource_id = uuid.UUID(row["resource_id"])
         return {
             **privs,
             resource_id: (row["privilege_id"],) + privs.get(
@@ -16,14 +23,18 @@ def __organise_privileges_by_resource_id__(rows):
         }
     return reduce(__organise__, rows, {})
 
+
 def authorised_for(conn: db.DbConnection,
                    user: User,
                    privileges: tuple[str, ...],
-                   resource_ids: Sequence[UUID]) -> dict[UUID, bool]:
+                   resource_ids: Sequence[uuid.UUID]) -> dict[uuid.UUID, bool]:
     """
     Check whether `user` is authorised to access `resources` according to given
     `privileges`.
     """
+    warnings.warn(DeprecationWarning(
+        f"The function `{__name__}.authorised_for` is deprecated. Please use "
+        f"`{__name__}.authorised_for_spec`"))
     with db.cursor(conn) as cursor:
         cursor.execute(
             ("SELECT ur.*, rp.privilege_id FROM "
@@ -45,3 +56,67 @@ def authorised_for(conn: db.DbConnection,
             resource_id: resource_id in authorised
             for resource_id in resource_ids
         }
+
+
+def authorised_for2(
+        conn: db.DbConnection,
+        user: User,
+        resource: Resource,
+        privileges: tuple[str, ...]
+) -> bool:
+    """
+    Check that `user` has **ALL** the specified privileges for the resource.
+    """
+    warnings.warn(DeprecationWarning(
+        f"The function `{__name__}.authorised_for2` is deprecated. Please use "
+        f"`{__name__}.authorised_for_spec`"))
+    with db.cursor(conn) as cursor:
+        _query = (
+            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
+            "privileges.* "
+            "FROM resources INNER JOIN user_roles "
+            "ON resources.resource_id=user_roles.resource_id "
+            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
+            "INNER JOIN privileges "
+            "ON role_privileges.privilege_id=privileges.privilege_id "
+            "WHERE resources.resource_id=? "
+            "AND user_roles.user_id=?")
+        cursor.execute(
+            _query,
+            (str(resource.resource_id), str(user.user_id)))
+        _db_privileges = tuple(
+            db_row_to_privilege(row) for row in cursor.fetchall())
+
+    str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
+    return all((requested_privilege in str_privileges)
+               for requested_privilege in privileges)
+
+
+def authorised_for_spec(
+        conn: db.DbConnection,
+        user_id: uuid.UUID,
+        resource_id: uuid.UUID,
+        auth_spec: str
+) -> bool:
+    """
+    Check that a user, identified with `user_id`, has a set of privileges that
+    satisfy the `auth_spec` for the resource identified with `resource_id`.
+    """
+    with db.cursor(conn) as cursor:
+        _query = (
+            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
+            "privileges.* "
+            "FROM resources INNER JOIN user_roles "
+            "ON resources.resource_id=user_roles.resource_id "
+            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
+            "INNER JOIN privileges "
+            "ON role_privileges.privilege_id=privileges.privilege_id "
+            "WHERE resources.resource_id=? "
+            "AND user_roles.user_id=?")
+        cursor.execute(
+            _query,
+            (str(resource_id), str(user_id)))
+        _privileges = tuple(row["privilege_id"] for row in cursor.fetchall())
+    return check(auth_spec, _privileges)