aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-06-09 12:47:33 -0500
committerFrederick Muriuki Muriithi2025-06-09 12:47:33 -0500
commitcd36b02e3cbaadfa086b373798fb23fb8bebe8b1 (patch)
treef62d8fb2165747fe5c62210b76bb8a24568a465e
parent22b285becb3d46fe7272870558fbde56396188e2 (diff)
downloadgn-auth-cd36b02e3cbaadfa086b373798fb23fb8bebe8b1.tar.gz
Check for a user's privileges on a particular resource.
Add a general function to check that a particular user has certain privileges on a particular resource.
-rw-r--r--gn_auth/auth/authorisation/resources/checks.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py
index d8e3a9f..ca45a20 100644
--- a/gn_auth/auth/authorisation/resources/checks.py
+++ b/gn_auth/auth/authorisation/resources/checks.py
@@ -3,9 +3,13 @@ from uuid import UUID
from functools import reduce
from typing import Sequence
+from .base import Resource
+
from ...db import sqlite3 as db
from ...authentication.users import User
+from ..privileges.models import db_row_to_privilege
+
def __organise_privileges_by_resource_id__(rows):
def __organise__(privs, row):
resource_id = UUID(row["resource_id"])
@@ -16,6 +20,7 @@ def __organise_privileges_by_resource_id__(rows):
}
return reduce(__organise__, rows, {})
+
def authorised_for(conn: db.DbConnection,
user: User,
privileges: tuple[str, ...],
@@ -45,3 +50,35 @@ def authorised_for(conn: db.DbConnection,
resource_id: resource_id in authorised
for resource_id in resource_ids
}
+
+
+def authorised_for2(
+ conn: db.DbConnection,
+ user: User,
+ resource: Resource,
+ privileges: tuple[str, ...]
+) -> bool:
+ """
+ Check that `user` has **ALL** the specified privileges for the resource.
+ """
+ with db.cursor(conn) as cursor:
+ _query = (
+ "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
+ "privileges.* "
+ "FROM resources INNER JOIN user_roles "
+ "ON resources.resource_id=user_roles.resource_id "
+ "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+ "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
+ "INNER JOIN privileges "
+ "ON role_privileges.privilege_id=privileges.privilege_id "
+ f"WHERE resources.resource_id=? "
+ "AND user_roles.user_id=?")
+ cursor.execute(
+ _query,
+ (str(resource.resource_id), str(user.user_id)))
+ _db_privileges = tuple(
+ db_row_to_privilege(row) for row in cursor.fetchall())
+
+ str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
+ return all((requested_privilege in str_privileges)
+ for requested_privilege in privileges)