diff options
Diffstat (limited to 'gn_auth/auth/authorisation/roles')
-rw-r--r-- | gn_auth/auth/authorisation/roles/models.py | 24 |
1 files changed, 22 insertions, 2 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py index 94ad2d1..699e3b3 100644 --- a/gn_auth/auth/authorisation/roles/models.py +++ b/gn_auth/auth/authorisation/roles/models.py @@ -2,8 +2,7 @@ from uuid import UUID, uuid4 from functools import reduce from dataclasses import dataclass - -from typing import Sequence, Iterable +from typing import Sequence, Iterable, Optional from pymonad.either import Left, Right, Either @@ -219,3 +218,24 @@ def assign_user_role_by_name( "role_id": role["role_id"], "resource_id": str(resource_id) }) + + +def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]: + """Fetch a role from the database by its ID.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM roles AS r INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id " + "WHERE r.role_id=?", + (str(role_id),)) + results = cursor.fetchall() + + if not bool(results): + return None + + _roles = db_rows_to_roles(results) + if len(_roles) > 1: + raise Exception("Data corruption: Expected a single role.") + + return _roles[0] |