about summary refs log tree commit diff
path: root/gn3/auth/authorisation/groups.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-01-23 14:30:20 +0300
committerFrederick Muriuki Muriithi2023-01-23 14:30:20 +0300
commitb9139c2356f75103bc5fd17f074f4ee0e74b64aa (patch)
tree06803f97ccea91ce5137d42f42e1abe33c38365c /gn3/auth/authorisation/groups.py
parente92ceacccb4c8d32f28ed7d2530ddc6912a730d4 (diff)
downloadgenenetwork3-b9139c2356f75103bc5fd17f074f4ee0e74b64aa.tar.gz
auth: create group: Fix group creation.
* gn3/auth/authorisation/checks.py: Enable passing user to authorisation
  checking function. Raise error on authorisation failure for consistent error
  handling.
* gn3/auth/authorisation/groups.py: Add user to group, updating the privileges
  as appropriate.
* gn3/auth/authorisation/resources.py: Fix resources querying
* gn3/auth/authorisation/roles.py: Assign/revoke roles by name
* gn3/auth/authorisation/views.py: Create group
* migrations/auth/20221108_01_CoxYh-create-the-groups-table.py: Add
  group_metadata field
* tests/unit/auth/fixtures/group_fixtures.py: fix tests
* tests/unit/auth/test_groups.py: fix tests
* tests/unit/auth/test_resources.py: fix tests
* tests/unit/auth/test_roles.py: fix tests
Diffstat (limited to 'gn3/auth/authorisation/groups.py')
-rw-r--r--gn3/auth/authorisation/groups.py97
1 files changed, 71 insertions, 26 deletions
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py
index 3367b57..6d1b1a3 100644
--- a/gn3/auth/authorisation/groups.py
+++ b/gn3/auth/authorisation/groups.py
@@ -1,23 +1,31 @@
 """Handle the management of resource/user groups."""
+import json
 from uuid import UUID, uuid4
-from typing import Sequence, Iterable, NamedTuple
+from typing import Any, Sequence, Iterable, Optional, NamedTuple
 
 from flask import g
 from pymonad.maybe import Just, Maybe, Nothing
 
 from gn3.auth import db
 from gn3.auth.authentication.users import User
+from gn3.auth.dictify import register_dictifier
 from gn3.auth.authentication.checks import authenticated_p
 
 from .checks import authorised_p
 from .privileges import Privilege
-from .roles import Role, create_role
 from .errors import AuthorisationError
+from .roles import (
+    Role, create_role, revoke_user_role_by_name, assign_user_role_by_name)
 
 class Group(NamedTuple):
     """Class representing a group."""
     group_id: UUID
     group_name: str
+    group_metadata: dict[str, Any]
+
+register_dictifier(Group, lambda grp: {
+    "group_id": grp.group_id, "group_name": grp.group_name,
+    "group_metadata": grp.group_metadata})
 
 class GroupRole(NamedTuple):
     """Class representing a role tied/belonging to a group."""
@@ -25,6 +33,9 @@ class GroupRole(NamedTuple):
     group: Group
     role: Role
 
+class GroupCreationError(AuthorisationError):
+    """Raised whenever a group creation fails"""
+
 class MembershipError(AuthorisationError):
     """Raised when there is an error with a user's membership to a group."""
 
@@ -39,35 +50,43 @@ class MembershipError(AuthorisationError):
 def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]:
     """Returns all the groups that a member belongs to"""
     query = (
-        "SELECT groups.group_id, group_name FROM group_users INNER JOIN groups "
+        "SELECT groups.group_id, group_name, groups.group_metadata "
+        "FROM group_users INNER JOIN groups "
         "ON group_users.group_id=groups.group_id "
         "WHERE group_users.user_id=?")
     with db.cursor(conn) as cursor:
         cursor.execute(query, (str(user.user_id),))
-        groups = tuple(Group(row[0], row[1]) for row in cursor.fetchall())
+        groups = tuple(Group(row[0], row[1], json.loads(row[2]))
+                       for row in cursor.fetchall())
 
     return groups
 
 @authenticated_p
-@authorised_p(("system:group:create-group",),
-              error_message="Failed to create group.")
-def create_group(conn: db.DbConnection, group_name: str,
-                 group_leader: User) -> Group:
-    """Create a group"""
-    group = Group(uuid4(), group_name)
+def create_group(
+        conn: db.DbConnection, group_name: str, group_leader: User,
+        group_description: Optional[str] = None) -> Group:
+    """Create a new group."""
     user_groups = user_membership(conn, group_leader)
     if len(user_groups) > 0:
         raise MembershipError(group_leader, user_groups)
 
-    with db.cursor(conn) as cursor:
-        cursor.execute(
-            "INSERT INTO groups(group_id, group_name) VALUES (?, ?)",
-            (str(group.group_id), group_name))
-        cursor.execute(
-            "INSERT INTO group_users VALUES (?, ?)",
-            (str(group.group_id), str(group_leader.user_id)))
-
-    return group
+    @authorised_p(
+        ("system:group:create-group",), (
+            "You do not have the appropriate privileges to enable you to "
+            "create a new group."),
+        group_leader)
+    def __create_group__():
+        with db.cursor(conn) as cursor:
+            new_group = __save_group__(
+                cursor, group_name,(
+                    {"group_description": group_description}
+                    if group_description else {}))
+            add_user_to_group(cursor, new_group, group_leader)
+            revoke_user_role_by_name(cursor, group_leader, "group-creator")
+            assign_user_role_by_name(cursor, group_leader, "group-leader")
+            return new_group
+
+    return __create_group__()
 
 @authenticated_p
 @authorised_p(("group:role:create-role",),
@@ -96,11 +115,12 @@ def authenticated_user_group(conn) -> Maybe:
     user = g.user
     with db.cursor(conn) as cursor:
         cursor.execute(
-            ("SELECT groups.group_id, groups.group_name FROM group_users "
+            ("SELECT groups.* FROM group_users "
              "INNER JOIN groups ON group_users.group_id=groups.group_id "
              "WHERE group_users.user_id = ?"),
             (str(user.user_id),))
-        groups = tuple(Group(UUID(row[0]), row[1]) for row in cursor.fetchall())
+        groups = tuple(Group(UUID(row[0]), row[1], json.loads(row[2] or "{}"))
+                       for row in cursor.fetchall())
 
     if len(groups) > 1:
         raise MembershipError(user, groups)
@@ -110,14 +130,17 @@ def authenticated_user_group(conn) -> Maybe:
 
     return Nothing
 
-def user_group(cursor: db.DbCursor, user: User) -> Maybe:
+def user_group(cursor: db.DbCursor, user: User) -> Maybe[Group]:
     """Returns the given user's group"""
     cursor.execute(
-        ("SELECT groups.group_id, groups.group_name FROM group_users "
+        ("SELECT groups.group_id, groups.group_name, groups.group_metadata "
+         "FROM group_users "
          "INNER JOIN groups ON group_users.group_id=groups.group_id "
          "WHERE group_users.user_id = ?"),
         (str(user.user_id),))
-    groups = tuple(Group(UUID(row[0]), row[1]) for row in cursor.fetchall())
+    groups = tuple(
+        Group(UUID(row[0]), row[1], json.loads(row[2] or "{}"))
+        for row in cursor.fetchall())
 
     if len(groups) > 1:
         raise MembershipError(user, groups)
@@ -129,7 +152,7 @@ def user_group(cursor: db.DbCursor, user: User) -> Maybe:
 
 def is_group_leader(cursor: db.DbCursor, user: User, group: Group):
     """Check whether the given `user` is the leader of `group`."""
-    ugroup = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[misc]
+    ugroup = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[arg-type, misc]
     if not group:
         # User cannot be a group leader if not a member of ANY group
         return False
@@ -153,6 +176,28 @@ def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]:
         res = cursor.fetchall()
         if res:
             return Just(tuple(
-                Group(row["group_id"], row["group_name"]) for row in res))
+                Group(row["group_id"], row["group_name"],
+                      json.loads(row["group_metadata"])) for row in res))
 
     return Nothing
+
+def __save_group__(
+        cursor: db.DbCursor, group_name: str,
+        group_metadata: dict[str, Any]) -> Group:
+    """Save a group to db"""
+    the_group = Group(uuid4(), group_name, group_metadata)
+    cursor.execute(
+        ("INSERT INTO groups "
+         "VALUES(:group_id, :group_name, :group_metadata) "
+         "ON CONFLICT (group_id) DO UPDATE SET "
+         "group_name=:group_name, group_metadata=:group_metadata"),
+    {"group_id": str(the_group.group_id), "group_name": the_group.group_name,
+     "group_metadata": json.dumps(the_group.group_metadata)})
+    return the_group
+
+def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User):
+    """Add `user` to `the_group` as a member."""
+    cursor.execute(
+        ("INSERT INTO group_users VALUES (:group_id, :user_id) "
+         "ON CONFLICT (group_id, user_id) DO NOTHING"),
+        {"group_id": str(the_group.group_id), "user_id": str(user.user_id)})