From a93e8cc48815bd7e9b64c634ad808ddbfa85cd4b Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 21 Nov 2022 02:44:38 +0300 Subject: auth: Prevent group leader from being a member of multiple groups * gn3/auth/authorisation/groups.py: Assign the group leader at group creation time. * tests/unit/auth/test_groups.py: Ensure the group leader is only ever a member of a single group. --- gn3/auth/authorisation/groups.py | 41 ++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) (limited to 'gn3/auth/authorisation') diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py index 6d7b885..7597a04 100644 --- a/gn3/auth/authorisation/groups.py +++ b/gn3/auth/authorisation/groups.py @@ -1,8 +1,10 @@ """Handle the management of resource/user groups.""" from uuid import UUID, uuid4 -from typing import Iterable, NamedTuple +from typing import Sequence, Iterable, NamedTuple from gn3.auth import db +from gn3.auth.authentication.users import User + from .privileges import Privilege from .roles import Role, create_role from .checks import authorised_p @@ -17,18 +19,45 @@ class GroupRole(NamedTuple): group_role_id: UUID role: Role +class MembershipError(Exception): + """Raised when there is an error with a user's membership to a group.""" + + def __init__(self, user: User, groups: Sequence[Group]): + """Initialise the `MembershipError` exception object.""" + groups_str = ", ".join(group.group_name for group in groups) + error_message = ( + f"User '{user.name} ({user.email})' is a member of {len(groups)} " + f"groups ({groups_str})") + super().__init__(f"{type(self).__name__}: {error_message}.") + +def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: + """Returns all the groups that a member belongs to""" + query = ( + "SELECT groups.group_id, group_name FROM group_users INNER JOIN groups " + "ON group_users.group_id=groups.group_id " + "WHERE group_users.user_id=?") + with db.cursor(conn) as cursor: + cursor.execute(query, (str(user.user_id),)) + groups = tuple(Group(row[0], row[1]) for row in cursor.fetchall()) + + return groups + @authorised_p(("create-group",), error_message="Failed to create group.") -def create_group(conn: db.DbConnection, group_name: str) -> Group: +def create_group(conn: db.DbConnection, group_name: str, + group_leader: User) -> Group: """Create a group""" group = Group(uuid4(), group_name) + user_groups = user_membership(conn, group_leader) + if len(user_groups) > 0: + raise MembershipError(group_leader, user_groups) + with db.cursor(conn) as cursor: - ## Maybe check whether the user is already a member of a group - ## if they are not a member of any group, proceed to create the group - ## if they are a member of a group, then fail with an exception cursor.execute( "INSERT INTO groups(group_id, group_name) VALUES (?, ?)", (str(group.group_id), group_name)) - ## Maybe assign `group-leader` role to user creating the group + cursor.execute( + "INSERT INTO group_users VALUES (?, ?)", + (str(group.group_id), str(group_leader.user_id))) return group -- cgit v1.2.3