about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn_auth/auth/authorisation/resources/groups/models.py100
1 files changed, 67 insertions, 33 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/models.py b/gn_auth/auth/authorisation/resources/groups/models.py
index 3afa7bf..fc10972 100644
--- a/gn_auth/auth/authorisation/resources/groups/models.py
+++ b/gn_auth/auth/authorisation/resources/groups/models.py
@@ -3,7 +3,7 @@ import json
 from uuid import UUID, uuid4
 from functools import reduce
 from dataclasses import dataclass, asdict
-from typing import Any, Sequence, Iterable, Optional, NamedTuple
+from typing import Any, Sequence, Iterable, Optional
 
 from flask import g
 from pymonad.maybe import Just, Maybe, Nothing
@@ -46,9 +46,11 @@ class GroupRole:
     role: Role
 
 
+
 class GroupCreationError(AuthorisationError):
     """Raised whenever a group creation fails"""
 
+
 class MembershipError(AuthorisationError):
     """Raised when there is an error with a user's membership to a group."""
 
@@ -60,6 +62,7 @@ class MembershipError(AuthorisationError):
             f"groups ({groups_str})")
         super().__init__(f"{type(self).__name__}: {error_description}.")
 
+
 def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]:
     """Returns all the groups that a member belongs to"""
     query = (
@@ -74,18 +77,19 @@ def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]:
 
     return groups
 
+
 @authorised_p(
-    privileges = ("system:group:create-group",),
-    error_description = (
+    privileges=("system:group:create-group",),
+    error_description=(
         "You do not have the appropriate privileges to enable you to "
         "create a new group."),
-    oauth2_scope = "profile group")
+    oauth2_scope="profile group")
 def create_group(
         conn: db.DbConnection, group_name: str, group_leader: User,
         group_description: Optional[str] = None) -> Group:
     """Create a new group."""
     def resource_category_by_key(
-        cursor: db.DbCursor, category_key: str):
+            cursor: db.DbCursor, category_key: str):
         """Retrieve a resource category by its key."""
         cursor.execute(
             "SELECT * FROM resource_categories WHERE "
@@ -103,7 +107,7 @@ def create_group(
 
     with db.cursor(conn) as cursor:
         new_group = save_group(
-            cursor, group_name,(
+            cursor, group_name, (
                 {"group_description": group_description}
                 if group_description else {}))
         group_resource = {
@@ -111,7 +115,9 @@ def create_group(
             "resource_id": str(uuid4()),
             "resource_name": group_name,
             "resource_category_id": str(
-                resource_category_by_key(cursor, "group")["resource_category_id"]),
+                resource_category_by_key(
+                    cursor, "group")["resource_category_id"]
+            ),
             "public": 0
         }
         cursor.execute(
@@ -131,6 +137,7 @@ def create_group(
             "group-leader")
         return new_group
 
+
 @authorised_p(("group:role:create-role",),
               error_description="Could not create the group role")
 def create_group_role(
@@ -147,6 +154,7 @@ def create_group_role(
 
     return GroupRole(group_role_id, group, role)
 
+
 def authenticated_user_group(conn) -> Maybe:
     """
     Returns the currently authenticated user's group.
@@ -171,12 +179,13 @@ def authenticated_user_group(conn) -> Maybe:
 
     return Nothing
 
+
 def user_group(conn: db.DbConnection, user: User) -> Maybe[Group]:
     """Returns the given user's group"""
     with db.cursor(conn) as cursor:
         cursor.execute(
-            ("SELECT groups.group_id, groups.group_name, groups.group_metadata "
-             "FROM group_users "
+            ("SELECT groups.group_id, groups.group_name, "
+             "groups.group_metadata FROM group_users "
              "INNER JOIN groups ON group_users.group_id=groups.group_id "
              "WHERE group_users.user_id = ?"),
             (str(user.user_id),))
@@ -192,11 +201,12 @@ def user_group(conn: db.DbConnection, user: User) -> Maybe[Group]:
 
     return Nothing
 
+
 def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool:
     """Check whether the given `user` is the leader of `group`."""
 
     ugroup = user_group(conn, user).maybe(
-        False, lambda val: val) # type: ignore[arg-type, misc]
+        False, lambda val: val)  # type: ignore[arg-type, misc]
     if not group:
         # User cannot be a group leader if not a member of ANY group
         return False
@@ -214,6 +224,7 @@ def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool:
 
         return "group-leader" in role_names
 
+
 def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]:
     """Retrieve all existing groups"""
     with db.cursor(conn) as cursor:
@@ -226,6 +237,7 @@ def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]:
 
     return Nothing
 
+
 def save_group(
         cursor: db.DbCursor, group_name: str,
         group_metadata: dict[str, Any]) -> Group:
@@ -236,10 +248,12 @@ def save_group(
          "VALUES(:group_id, :group_name, :group_metadata) "
          "ON CONFLICT (group_id) DO UPDATE SET "
          "group_name=:group_name, group_metadata=:group_metadata"),
-        {"group_id": str(the_group.group_id), "group_name": the_group.group_name,
+        {"group_id": str(the_group.group_id),
+         "group_name": the_group.group_name,
          "group_metadata": json.dumps(the_group.group_metadata)})
     return the_group
 
+
 def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User):
     """Add `user` to `the_group` as a member."""
     cursor.execute(
@@ -247,11 +261,12 @@ def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User):
          "ON CONFLICT (group_id, user_id) DO NOTHING"),
         {"group_id": str(the_group.group_id), "user_id": str(user.user_id)})
 
+
 @authorised_p(
-    privileges = ("system:group:view-group",),
-    error_description = (
-        "You do not have the appropriate privileges to access the list of users"
-        " in the group."))
+    privileges=("system:group:view-group",),
+    error_description=(
+        "You do not have the appropriate privileges to "
+        "access the list of users in the group."))
 def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]:
     """Retrieve all users that are members of group with id `group_id`."""
     with db.cursor(conn) as cursor:
@@ -264,9 +279,10 @@ def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]:
     return (User(UUID(row["user_id"]), row["email"], row["name"])
             for row in results)
 
+
 @authorised_p(
-    privileges = ("system:group:view-group",),
-    error_description = (
+    privileges=("system:group:view-group",),
+    error_description=(
         "You do not have the appropriate privileges to access the group."))
 def group_by_id(conn: db.DbConnection, group_id: UUID) -> Group:
     """Retrieve a group by its ID"""
@@ -282,19 +298,24 @@ def group_by_id(conn: db.DbConnection, group_id: UUID) -> Group:
 
     raise NotFoundError(f"Could not find group with ID '{group_id}'.")
 
+
 @authorised_p(("system:group:view-group", "system:group:edit-group"),
-              error_description=("You do not have the appropriate authorisation"
-                                 " to act upon the join requests."),
+              error_description=(
+                  "You do not have the appropriate authorisation"
+                  " to act upon the join requests."
+              ),
               oauth2_scope="profile group")
 def join_requests(conn: db.DbConnection, user: User):
     """List all the join requests for the user's group."""
     with db.cursor(conn) as cursor:
-        group = user_group(conn, user).maybe(DUMMY_GROUP, lambda grp: grp)# type: ignore[misc]
+        group = user_group(conn, user).maybe(
+            DUMMY_GROUP, lambda grp: grp)  # type: ignore[misc]
         if group != DUMMY_GROUP and is_group_leader(conn, user, group):
             cursor.execute(
-                "SELECT gjr.*, u.email, u.name FROM group_join_requests AS gjr "
-                "INNER JOIN users AS u ON gjr.requester_id=u.user_id "
-                "WHERE gjr.group_id=? AND gjr.status='PENDING'",
+                "SELECT gjr.*, u.email, u.name FROM \
+group_join_requests AS gjr INNER JOIN users AS u ON \
+gjr.requester_id=u.user_id WHERE gjr.group_id=? AND \
+gjr.status='PENDING'",
                 (str(group.group_id),))
             return tuple(dict(row)for row in cursor.fetchall())
 
@@ -302,16 +323,21 @@ def join_requests(conn: db.DbConnection, user: User):
         "You do not have the appropriate authorisation to access the "
         "group's join requests.")
 
+
 @authorised_p(("system:group:view-group", "system:group:edit-group"),
-              error_description=("You do not have the appropriate authorisation"
-                                 " to act upon the join requests."),
+              error_description=(
+                  "You do not have the appropriate authorisation"
+                  " to act upon the join requests."
+              ),
               oauth2_scope="profile group")
 def accept_reject_join_request(
-        conn: db.DbConnection, request_id: UUID, user: User, status: str) -> dict:
+        conn: db.DbConnection, request_id: UUID, user: User, status: str
+) -> dict:
     """Accept/Reject a join request."""
     assert status in ("ACCEPTED", "REJECTED"), f"Invalid status '{status}'."
     with db.cursor(conn) as cursor:
-        group = user_group(conn, user).maybe(DUMMY_GROUP, lambda grp: grp) # type: ignore[misc]
+        group = user_group(conn, user).maybe(
+            DUMMY_GROUP, lambda grp: grp)  # type: ignore[misc]
         cursor.execute("SELECT * FROM group_join_requests WHERE request_id=?",
                        (str(request_id),))
         row = cursor.fetchone()
@@ -321,7 +347,8 @@ def accept_reject_join_request(
                     the_user = user_by_id(conn, UUID(row["requester_id"]))
                     if status == "ACCEPTED":
                         add_user_to_group(cursor, group, the_user)
-                        revoke_user_role_by_name(cursor, the_user, "group-creator")
+                        revoke_user_role_by_name(
+                            cursor, the_user, "group-creator")
                     cursor.execute(
                         "UPDATE group_join_requests SET status=? "
                         "WHERE request_id=?",
@@ -335,6 +362,7 @@ def accept_reject_join_request(
                 "You cannot act on other groups join requests")
         raise NotFoundError(f"Could not find request with ID '{request_id}'")
 
+
 def __organise_privileges__(acc, row):
     role_id = UUID(row["role_id"])
     role = acc.get(role_id, False)
@@ -359,11 +387,13 @@ def __organise_privileges__(acc, row):
 # @authorised_p(("group:role:view",),
 #               "Insufficient privileges to view role",
 #               oauth2_scope="profile group role")
+
+
 def group_role_by_id(
         conn: db.DbConnection, group: Group, group_role_id: UUID) -> GroupRole:
     """Retrieve GroupRole from id by its `group_role_id`."""
-    ## TODO: do privileges check before running actual query
-    ##       the check commented out above doesn't work correctly
+    # TODO: do privileges check before running actual query
+    # the check commented out above doesn't work correctly
     with db.cursor(conn) as cursor:
         cursor.execute(
             "SELECT gr.group_role_id, r.*, p.* "
@@ -375,20 +405,21 @@ def group_role_by_id(
             (str(group_role_id), str(group.group_id)))
         rows = cursor.fetchall()
         if rows:
-            roles: tuple[Role,...] = tuple(reduce(
+            roles: tuple[Role, ...] = tuple(reduce(
                 __organise_privileges__, rows, {}).values())
             assert len(roles) == 1
             return GroupRole(group_role_id, group, roles[0])
         raise NotFoundError(
             f"Group role with ID '{group_role_id}' does not exist.")
 
+
 @authorised_p(("group:role:edit-role",),
               "You do not have the privilege to edit a role.",
               oauth2_scope="profile group role")
 def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole,
                                 privilege: Privilege) -> GroupRole:
     """Add `privilege` to `group_role`."""
-    ## TODO: do privileges check.
+    # TODO: do privileges check.
     check_user_editable(group_role.role)
     with db.cursor(conn) as cursor:
         cursor.execute(
@@ -404,6 +435,7 @@ def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole,
                  group_role.role.user_editable,
                  group_role.role.privileges + (privilege,)))
 
+
 @authorised_p(("group:role:edit-role",),
               "You do not have the privilege to edit a role.",
               oauth2_scope="profile group role")
@@ -411,7 +443,7 @@ def delete_privilege_from_group_role(
         conn: db.DbConnection, group_role: GroupRole,
         privilege: Privilege) -> GroupRole:
     """Delete `privilege` to `group_role`."""
-    ## TODO: do privileges check.
+    # TODO: do privileges check.
     check_user_editable(group_role.role)
     with db.cursor(conn) as cursor:
         cursor.execute(
@@ -427,6 +459,7 @@ def delete_privilege_from_group_role(
                  tuple(priv for priv in group_role.role.privileges
                        if priv != privilege)))
 
+
 def resource_owner(conn: db.DbConnection, resource: Resource) -> Group:
     """Return the user group that owns the resource."""
     with db.cursor(conn) as cursor:
@@ -444,6 +477,7 @@ def resource_owner(conn: db.DbConnection, resource: Resource) -> Group:
 
     raise MissingGroupError("Resource has no 'owning' group.")
 
+
 def add_resources_to_group(conn: db.DbConnection,
                            resources: tuple[Resource, ...],
                            group: Group):