aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/groups/models.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-03-03 14:00:21 +0300
committerFrederick Muriuki Muriithi2023-03-03 14:00:21 +0300
commited2ff492bb782a44f0e8b3e6de33095e865f8151 (patch)
treeea140955feb44cca953955df8eb94fca86168eed /gn3/auth/authorisation/groups/models.py
parent94502a92d8ae3277b8dd07eb1117367821241913 (diff)
downloadgenenetwork3-ed2ff492bb782a44f0e8b3e6de33095e865f8151.tar.gz
auth: Enable viewing group role details.
Diffstat (limited to 'gn3/auth/authorisation/groups/models.py')
-rw-r--r--gn3/auth/authorisation/groups/models.py42
1 files changed, 42 insertions, 0 deletions
diff --git a/gn3/auth/authorisation/groups/models.py b/gn3/auth/authorisation/groups/models.py
index 2a6f840..777e2d0 100644
--- a/gn3/auth/authorisation/groups/models.py
+++ b/gn3/auth/authorisation/groups/models.py
@@ -1,6 +1,7 @@
"""Handle the management of resource/user groups."""
import json
from uuid import UUID, uuid4
+from functools import reduce
from typing import Any, Sequence, Iterable, Optional, NamedTuple
from flask import g
@@ -301,3 +302,44 @@ def accept_reject_join_request(
raise AuthorisationError(
"You cannot act on other groups join requests")
raise NotFoundError(f"Could not find request with ID '{request_id}'")
+
+def __organise_privileges__(acc, row):
+ role_id = UUID(row["role_id"])
+ role = acc.get(role_id, False)
+ if role:
+ return {
+ **acc,
+ role_id: Role(role.role_id, role.role_name, role.privileges + (
+ Privilege(row["privilege_id"], row["privilege_description"]),))
+ }
+ return {
+ **acc,
+ role_id: Role(UUID(row["role_id"]), row["role_name"], (
+ Privilege(row["privilege_id"], row["privilege_description"]),))
+ }
+
+# @authorised_p(("group:role:view",),
+# "Insufficient privileges to view role",
+# oauth2_scope="profile group role")
+def group_role_by_id(
+ conn: db.DbConnection, group: Group, group_role_id: UUID) -> GroupRole:
+ """Retrieve GroupRole from id by its `group_role_id`."""
+ ## TODO: do privileges check before running actual query
+ ## the check commented out above doesn't work correctly
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT gr.group_role_id, r.*, p.* "
+ "FROM group_roles AS gr "
+ "INNER JOIN roles AS r ON gr.role_id=r.role_id "
+ "INNER JOIN role_privileges AS rp ON rp.role_id=r.role_id "
+ "INNER JOIN privileges AS p ON p.privilege_id=rp.privilege_id "
+ "WHERE gr.group_role_id=? AND gr.group_id=?",
+ (str(group_role_id), str(group.group_id)))
+ rows = cursor.fetchall()
+ if rows:
+ roles: tuple[Role,...] = tuple(reduce(
+ __organise_privileges__, rows, {}).values())
+ assert len(roles) == 1
+ return GroupRole(group_role_id, group, roles[0])
+ raise NotFoundError(
+ f"Group role with ID '{group_role_id}' does not exist.")