From 6369eceb15755026df518c94cc57a331275fa3d3 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 12 Dec 2022 13:36:15 +0300 Subject: auth: functions to get user resources * gn3/auth/authorisation/resources.py: add function to get the resources that the user has access to. --- gn3/auth/authorisation/resources.py | 43 +++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) (limited to 'gn3') diff --git a/gn3/auth/authorisation/resources.py b/gn3/auth/authorisation/resources.py index 2c0b72b..0a91930 100644 --- a/gn3/auth/authorisation/resources.py +++ b/gn3/auth/authorisation/resources.py @@ -1,13 +1,13 @@ """Handle the management of resources.""" from uuid import UUID, uuid4 -from typing import Sequence, NamedTuple +from typing import Dict, Sequence, NamedTuple from gn3.auth import db from gn3.auth.authentication.users import User from .checks import authorised_p from .exceptions import AuthorisationError -from .groups import Group, authenticated_user_group +from .groups import Group, user_group, is_group_leader, authenticated_user_group class MissingGroupError(AuthorisationError): """Raised for any resource operation without a group.""" @@ -73,6 +73,41 @@ def public_resources(conn: db.DbConnection) -> Sequence[Resource]: bool(row[4])) for row in results) -def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]:# pylint: disable=[unused-argument] - """List the resources available to the user""" +def group_leader_resources( + cursor: db.DbCursor, user: User, group: Group, + res_categories: Dict[UUID, ResourceCategory]) -> Sequence[Resource]: + """Return all the resources available to the group leader""" + if is_group_leader(cursor, user, group): + cursor.execute("SELECT * FROM resources WHERE group_id=?", + (str(group.group_id),)) + return tuple( + Resource(group, UUID(row[1]), row[2], res_categories[UUID(row[3])], + bool(row[4])) + for row in cursor.fetchall()) return tuple() + +def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]: + """List the resources available to the user""" + categories = { # Repeated in `public_resources` function + cat.resource_category_id: cat for cat in resource_categories(conn) + } + with db.cursor(conn) as cursor: + group = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[misc] + if not group: + return public_resources(conn) + + gl_resources = group_leader_resources(cursor, user, group, categories) + + cursor.execute( + ("SELECT resources.* FROM group_user_roles_on_resources " + "LEFT JOIN resources " + "ON group_user_roles_on_resources.resource_id=resources.resource_id " + "WHERE group_user_roles_on_resources.group_id = ? " + "AND group_user_roles_on_resources.user_id = ?"), + (str(group.group_id), str(user.user_id))) + private_res = tuple( + Resource(group, UUID(row[1]), row[2], categories[UUID(row[3])], + bool(row[4])) + for row in cursor.fetchall()) + + return tuple(set(private_res).union(gl_resources).union(public_resources(conn))) -- cgit v1.2.3