aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/roles
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/roles')
-rw-r--r--gn_auth/auth/authorisation/roles/models.py24
1 files changed, 22 insertions, 2 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py
index 94ad2d1..699e3b3 100644
--- a/gn_auth/auth/authorisation/roles/models.py
+++ b/gn_auth/auth/authorisation/roles/models.py
@@ -2,8 +2,7 @@
from uuid import UUID, uuid4
from functools import reduce
from dataclasses import dataclass
-
-from typing import Sequence, Iterable
+from typing import Sequence, Iterable, Optional
from pymonad.either import Left, Right, Either
@@ -219,3 +218,24 @@ def assign_user_role_by_name(
"role_id": role["role_id"],
"resource_id": str(resource_id)
})
+
+
+def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]:
+ """Fetch a role from the database by its ID."""
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT r.*, p.* FROM roles AS r INNER JOIN role_privileges AS rp "
+ "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
+ "ON rp.privilege_id=p.privilege_id "
+ "WHERE r.role_id=?",
+ (str(role_id),))
+ results = cursor.fetchall()
+
+ if not bool(results):
+ return None
+
+ _roles = db_rows_to_roles(results)
+ if len(_roles) > 1:
+ raise Exception("Data corruption: Expected a single role.")
+
+ return _roles[0]