diff options
author | Frederick Muriuki Muriithi | 2024-06-07 11:48:29 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2024-06-07 11:53:26 -0500 |
commit | c3b179f46e75e92de942a2d45d49c74cb887efc8 (patch) | |
tree | 03e2764e9fee497ca8c0b8673d3d4382aad87b60 | |
parent | 50e458b8951f036c487d7854ebe438e4dfbd6c4f (diff) | |
download | gn-auth-c3b179f46e75e92de942a2d45d49c74cb887efc8.tar.gz |
Replace `…/group/roles` endpoint with `…/resource/…/roles` endpoint.
The `…/group/roles` endpoint relied on the now deleted `group_roles`
table that caused the implementation to be prone to privilege
escalation attacks.
This commit provides the `…/resource/…/roles` endpoint that provides
the required functionality without the exposure.
-rw-r--r-- | gn_auth/auth/authorisation/resources/groups/views.py | 29 | ||||
-rw-r--r-- | gn_auth/auth/authorisation/resources/views.py | 41 |
2 files changed, 41 insertions, 29 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/views.py b/gn_auth/auth/authorisation/resources/groups/views.py index fb1a831..ef6bb0d 100644 --- a/gn_auth/auth/authorisation/resources/groups/views.py +++ b/gn_auth/auth/authorisation/resources/groups/views.py @@ -282,35 +282,6 @@ def link_data() -> Response: return jsonify(with_db_connection(__link__)) -@groups.route("/roles", methods=["GET"]) -@require_oauth("profile group") -def group_roles(): - """Return a list of all available group roles.""" - with require_oauth.acquire("profile group role") as the_token: - def __list_roles__(conn: db.DbConnection): - ## TODO: Check that user has appropriate privileges - with db.cursor(conn) as cursor: - group = user_group(conn, the_token.user).maybe(# type: ignore[misc] - DUMMY_GROUP, lambda grp: grp) - if group == DUMMY_GROUP: - return tuple() - cursor.execute( - "SELECT gr.group_role_id, r.* " - "FROM group_roles AS gr INNER JOIN roles AS r " - "ON gr.role_id=r.role_id " - "WHERE group_id=?", - (str(group.group_id),)) - return tuple( - GroupRole(uuid.UUID(row["group_role_id"]), - group, - Role(uuid.UUID(row["role_id"]), - row["role_name"], - bool(int(row["user_editable"])), - tuple())) - for row in cursor.fetchall()) - return jsonify(tuple( - asdict(role) for role in with_db_connection(__list_roles__))) - @groups.route("/privileges", methods=["GET"]) @require_oauth("profile group") def group_privileges(): diff --git a/gn_auth/auth/authorisation/resources/views.py b/gn_auth/auth/authorisation/resources/views.py index c481ef9..3a61704 100644 --- a/gn_auth/auth/authorisation/resources/views.py +++ b/gn_auth/auth/authorisation/resources/views.py @@ -17,6 +17,7 @@ from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.db.sqlite3 import with_db_connection from gn_auth.auth.authorisation.roles import Role +from gn_auth.auth.authorisation.privileges import Privilege from gn_auth.auth.errors import InvalidData, InconsistencyError, AuthorisationError from gn_auth.auth.authentication.oauth2.resource_server import require_oauth @@ -339,6 +340,46 @@ def toggle_public(resource_id: uuid.UUID) -> Response: "Made resource public" if resource.public else "Made resource private")}) + +@resources.route("<uuid:resource_id>/roles", methods=["GET"]) +@require_oauth("profile group resource role") +def resource_roles(resource_id: uuid.UUID) -> Response: + """Return the roles the user has to act on a given resource.""" + with require_oauth.acquire("profile group resource role") as _token: + def __resultset_to_roles__(roles, row): + _role = roles.get(row["role_id"]) + return { + **roles, + row["role_id"]: Role( + role_id=uuid.UUID(row["role_id"]), + role_name=row["role_name"], + user_editable=bool(row["user_editable"]), + privileges=( + (_role.privileges if bool(_role) else tuple()) + + (Privilege( + privilege_id=row["privilege_id"], + privilege_description=row[ + "privilege_description"]),))) + } + + + def __roles__(conn: db.DbConnection) -> tuple[Role, ...]: + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM resource_roles AS rr " + "INNER JOIN roles AS r ON rr.role_id=r.role_id " + "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " + "INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id " + "WHERE rr.resource_id=? AND rr.role_created_by=?", + (str(resource_id), str(_token.user.user_id))) + return tuple(reduce( + __resultset_to_roles__, cursor.fetchall(), {}).values()) + + + return jsonify(with_db_connection(__roles__)) + + @resources.route("/authorisation", methods=["POST"]) def resources_authorisation(): """Get user authorisations for given resource(s):""" |