aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/resources/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/resources/models.py')
-rw-r--r--gn_auth/auth/authorisation/resources/models.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/resources/models.py b/gn_auth/auth/authorisation/resources/models.py
index 451b165..25ae0fd 100644
--- a/gn_auth/auth/authorisation/resources/models.py
+++ b/gn_auth/auth/authorisation/resources/models.py
@@ -8,6 +8,8 @@ from gn_auth.auth.dictify import dictify
from gn_auth.auth.authentication.users import User
from gn_auth.auth.db.sqlite3 import with_db_connection
+from gn_auth.auth.authorisation.roles import Role
+from gn_auth.auth.authorisation.privileges import Privilege
from gn_auth.auth.authorisation.checks import authorised_p
from gn_auth.auth.authorisation.errors import NotFoundError, AuthorisationError
@@ -371,3 +373,48 @@ def save_resource(
raise AuthorisationError(
"You do not have the appropriate privileges to edit this resource.")
+
+def user_roles_on_resources(conn: db.DbConnection,
+ user: User,
+ resource_ids: tuple[UUID] = tuple()) -> dict:
+ """Get roles on resources for a particular user."""
+ def __setup_roles__(old_roles, row):
+ roles = {role.role_id: role for role in old_roles}
+ role_id = UUID(row["role_id"])
+ role = roles.get(role_id, Role(
+ role_id, row["role_name"], bool(int(row["user_editable"])),
+ tuple()))
+ return tuple({
+ **roles, role_id: Role(
+ role.role_id, role.role_name, role.user_editable,
+ role.privileges + (Privilege(
+ row["privilege_id"], row["privilege_description"]),))
+ }.values())
+
+ def __organise__(acc, row):
+ resid = UUID(row["resource_id"])
+ roles = acc.get(resid, {}).get("roles", tuple())
+ return {
+ **acc,
+ resid: {
+ "roles": __setup_roles__(roles, row)
+ }
+ }
+
+ query = (
+ "SELECT ur.user_id, ur.resource_id, r.*, p.* "
+ "FROM user_roles AS ur "
+ "INNER JOIN roles AS r ON ur.role_id=r.role_id "
+ "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+ "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+ "WHERE ur.user_id=?")
+ params = (str(user.user_id),)
+
+ if len(resource_ids) > 0:
+ pholders = ", ".join(["?"] * len(resource_ids))
+ query = f"{query} AND ur.resource_id IN ({pholders})"
+ params = params + tuple(str(resid) for resid in resource_ids)
+
+ with db.cursor(conn) as cursor:
+ cursor.execute(query, params)
+ return reduce(__organise__, cursor.fetchall(), {})