about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-06-07 11:48:29 -0500
committerFrederick Muriuki Muriithi2024-06-07 11:53:26 -0500
commitc3b179f46e75e92de942a2d45d49c74cb887efc8 (patch)
tree03e2764e9fee497ca8c0b8673d3d4382aad87b60
parent50e458b8951f036c487d7854ebe438e4dfbd6c4f (diff)
downloadgn-auth-c3b179f46e75e92de942a2d45d49c74cb887efc8.tar.gz
Replace `…/group/roles` endpoint with `…/resource/…/roles` endpoint.
The `…/group/roles` endpoint relied on the now deleted `group_roles`
table that caused the implementation to be prone to privilege
escalation attacks.

This commit provides the `…/resource/…/roles` endpoint that provides
the required functionality without the exposure.
-rw-r--r--gn_auth/auth/authorisation/resources/groups/views.py29
-rw-r--r--gn_auth/auth/authorisation/resources/views.py41
2 files changed, 41 insertions, 29 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/views.py b/gn_auth/auth/authorisation/resources/groups/views.py
index fb1a831..ef6bb0d 100644
--- a/gn_auth/auth/authorisation/resources/groups/views.py
+++ b/gn_auth/auth/authorisation/resources/groups/views.py
@@ -282,35 +282,6 @@ def link_data() -> Response:
 
         return jsonify(with_db_connection(__link__))
 
-@groups.route("/roles", methods=["GET"])
-@require_oauth("profile group")
-def group_roles():
-    """Return a list of all available group roles."""
-    with require_oauth.acquire("profile group role") as the_token:
-        def __list_roles__(conn: db.DbConnection):
-            ## TODO: Check that user has appropriate privileges
-            with db.cursor(conn) as cursor:
-                group = user_group(conn, the_token.user).maybe(# type: ignore[misc]
-                    DUMMY_GROUP, lambda grp: grp)
-                if group == DUMMY_GROUP:
-                    return tuple()
-                cursor.execute(
-                    "SELECT gr.group_role_id, r.* "
-                    "FROM group_roles AS gr INNER JOIN roles AS r "
-                    "ON gr.role_id=r.role_id "
-                    "WHERE group_id=?",
-                    (str(group.group_id),))
-                return tuple(
-                    GroupRole(uuid.UUID(row["group_role_id"]),
-                              group,
-                              Role(uuid.UUID(row["role_id"]),
-                                   row["role_name"],
-                                   bool(int(row["user_editable"])),
-                                   tuple()))
-                    for row in cursor.fetchall())
-        return jsonify(tuple(
-            asdict(role) for role in with_db_connection(__list_roles__)))
-
 @groups.route("/privileges", methods=["GET"])
 @require_oauth("profile group")
 def group_privileges():
diff --git a/gn_auth/auth/authorisation/resources/views.py b/gn_auth/auth/authorisation/resources/views.py
index c481ef9..3a61704 100644
--- a/gn_auth/auth/authorisation/resources/views.py
+++ b/gn_auth/auth/authorisation/resources/views.py
@@ -17,6 +17,7 @@ from gn_auth.auth.db import sqlite3 as db
 from gn_auth.auth.db.sqlite3 import with_db_connection
 
 from gn_auth.auth.authorisation.roles import Role
+from gn_auth.auth.authorisation.privileges import Privilege
 from gn_auth.auth.errors import InvalidData, InconsistencyError, AuthorisationError
 
 from gn_auth.auth.authentication.oauth2.resource_server import require_oauth
@@ -339,6 +340,46 @@ def toggle_public(resource_id: uuid.UUID) -> Response:
                 "Made resource public" if resource.public
                 else "Made resource private")})
 
+
+@resources.route("<uuid:resource_id>/roles", methods=["GET"])
+@require_oauth("profile group resource role")
+def resource_roles(resource_id: uuid.UUID) -> Response:
+    """Return the roles the user has to act on a given resource."""
+    with require_oauth.acquire("profile group resource role") as _token:
+        def __resultset_to_roles__(roles, row):
+            _role = roles.get(row["role_id"])
+            return {
+                **roles,
+                row["role_id"]: Role(
+                    role_id=uuid.UUID(row["role_id"]),
+                    role_name=row["role_name"],
+                    user_editable=bool(row["user_editable"]),
+                    privileges=(
+                        (_role.privileges if bool(_role) else tuple()) +
+                        (Privilege(
+                            privilege_id=row["privilege_id"],
+                            privilege_description=row[
+                                "privilege_description"]),)))
+                }
+
+
+        def __roles__(conn: db.DbConnection) -> tuple[Role, ...]:
+            with db.cursor(conn) as cursor:
+                cursor.execute(
+                    "SELECT r.*, p.* FROM resource_roles AS rr "
+                    "INNER JOIN roles AS r  ON rr.role_id=r.role_id "
+                    "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+                    "INNER JOIN privileges AS p "
+                    "ON rp.privilege_id=p.privilege_id "
+                    "WHERE rr.resource_id=? AND rr.role_created_by=?",
+                    (str(resource_id), str(_token.user.user_id)))
+                return tuple(reduce(
+                    __resultset_to_roles__, cursor.fetchall(), {}).values())
+
+
+        return jsonify(with_db_connection(__roles__))
+
+
 @resources.route("/authorisation", methods=["POST"])
 def resources_authorisation():
     """Get user authorisations for given resource(s):"""