about summary refs log tree commit diff
path: root/gn3/auth
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-11-21 02:44:38 +0300
committerFrederick Muriuki Muriithi2022-11-21 02:44:38 +0300
commita93e8cc48815bd7e9b64c634ad808ddbfa85cd4b (patch)
treea81d56b751dc5d39d72e78de98dd55d16487ccf2 /gn3/auth
parent8e0ed6fdb03d1a2c284a68a387105623c8947abd (diff)
downloadgenenetwork3-a93e8cc48815bd7e9b64c634ad808ddbfa85cd4b.tar.gz
auth: Prevent group leader from being a member of multiple groups
* gn3/auth/authorisation/groups.py: Assign the group leader at group creation
  time.
* tests/unit/auth/test_groups.py: Ensure the group leader is only ever a
  member of a single group.
Diffstat (limited to 'gn3/auth')
-rw-r--r--gn3/auth/authorisation/groups.py41
1 files changed, 35 insertions, 6 deletions
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py
index 6d7b885..7597a04 100644
--- a/gn3/auth/authorisation/groups.py
+++ b/gn3/auth/authorisation/groups.py
@@ -1,8 +1,10 @@
 """Handle the management of resource/user groups."""
 from uuid import UUID, uuid4
-from typing import Iterable, NamedTuple
+from typing import Sequence, Iterable, NamedTuple
 
 from gn3.auth import db
+from gn3.auth.authentication.users import User
+
 from .privileges import Privilege
 from .roles import Role, create_role
 from .checks import authorised_p
@@ -17,18 +19,45 @@ class GroupRole(NamedTuple):
     group_role_id: UUID
     role: Role
 
+class MembershipError(Exception):
+    """Raised when there is an error with a user's membership to a group."""
+
+    def __init__(self, user: User, groups: Sequence[Group]):
+        """Initialise the `MembershipError` exception object."""
+        groups_str = ", ".join(group.group_name for group in groups)
+        error_message = (
+            f"User '{user.name} ({user.email})' is a member of {len(groups)} "
+            f"groups ({groups_str})")
+        super().__init__(f"{type(self).__name__}: {error_message}.")
+
+def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]:
+    """Returns all the groups that a member belongs to"""
+    query = (
+        "SELECT groups.group_id, group_name FROM group_users INNER JOIN groups "
+        "ON group_users.group_id=groups.group_id "
+        "WHERE group_users.user_id=?")
+    with db.cursor(conn) as cursor:
+        cursor.execute(query, (str(user.user_id),))
+        groups = tuple(Group(row[0], row[1]) for row in cursor.fetchall())
+
+    return groups
+
 @authorised_p(("create-group",), error_message="Failed to create group.")
-def create_group(conn: db.DbConnection, group_name: str) -> Group:
+def create_group(conn: db.DbConnection, group_name: str,
+                 group_leader: User) -> Group:
     """Create a group"""
     group = Group(uuid4(), group_name)
+    user_groups = user_membership(conn, group_leader)
+    if len(user_groups) > 0:
+        raise MembershipError(group_leader, user_groups)
+
     with db.cursor(conn) as cursor:
-        ## Maybe check whether the user is already a member of a group
-        ## if they are not a member of any group, proceed to create the group
-        ## if they are a member of a group, then fail with an exception
         cursor.execute(
             "INSERT INTO groups(group_id, group_name) VALUES (?, ?)",
             (str(group.group_id), group_name))
-        ## Maybe assign `group-leader` role to user creating the group
+        cursor.execute(
+            "INSERT INTO group_users VALUES (?, ?)",
+            (str(group.group_id), str(group_leader.user_id)))
 
     return group