aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/resources/checks.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-08-04 10:10:28 +0300
committerFrederick Muriuki Muriithi2023-08-04 10:20:09 +0300
commit8b7c598407a5fea9a3d78473e72df87606998cd4 (patch)
tree8526433a17eca6b511feb082a0574f9b15cb9469 /gn_auth/auth/authorisation/resources/checks.py
parentf7fcbbcc014686ac597b783a8dcb38b43024b9d6 (diff)
downloadgn-auth-8b7c598407a5fea9a3d78473e72df87606998cd4.tar.gz
Copy over files from GN3 repository.
Diffstat (limited to 'gn_auth/auth/authorisation/resources/checks.py')
-rw-r--r--gn_auth/auth/authorisation/resources/checks.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py
new file mode 100644
index 0000000..fafde76
--- /dev/null
+++ b/gn_auth/auth/authorisation/resources/checks.py
@@ -0,0 +1,47 @@
+"""Handle authorisation checks for resources"""
+from uuid import UUID
+from functools import reduce
+from typing import Sequence
+
+from gn3.auth import db
+from gn3.auth.authentication.users import User
+
+def __organise_privileges_by_resource_id__(rows):
+ def __organise__(privs, row):
+ resource_id = UUID(row["resource_id"])
+ return {
+ **privs,
+ resource_id: (row["privilege_id"],) + privs.get(
+ resource_id, tuple())
+ }
+ return reduce(__organise__, rows, {})
+
+def authorised_for(conn: db.DbConnection, user: User, privileges: tuple[str],
+ resource_ids: Sequence[UUID]) -> dict[UUID, bool]:
+ """
+ Check whether `user` is authorised to access `resources` according to given
+ `privileges`.
+ """
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ ("SELECT guror.*, rp.privilege_id FROM "
+ "group_user_roles_on_resources AS guror "
+ "INNER JOIN group_roles AS gr ON "
+ "(guror.group_id=gr.group_id AND guror.role_id=gr.role_id) "
+ "INNER JOIN roles AS r ON gr.role_id=r.role_id "
+ "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+ "WHERE guror.user_id=? "
+ f"AND guror.resource_id IN ({', '.join(['?']*len(resource_ids))})"
+ f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"),
+ ((str(user.user_id),) + tuple(
+ str(r_id) for r_id in resource_ids) + tuple(privileges)))
+ resource_privileges = __organise_privileges_by_resource_id__(
+ cursor.fetchall())
+ authorised = tuple(resource_id for resource_id, res_privileges
+ in resource_privileges.items()
+ if all(priv in res_privileges
+ for priv in privileges))
+ return {
+ resource_id: resource_id in authorised
+ for resource_id in resource_ids
+ }