diff options
-rw-r--r-- | gn_auth/auth/authorisation/resources/groups/models.py | 100 |
1 files changed, 67 insertions, 33 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/models.py b/gn_auth/auth/authorisation/resources/groups/models.py index 3afa7bf..fc10972 100644 --- a/gn_auth/auth/authorisation/resources/groups/models.py +++ b/gn_auth/auth/authorisation/resources/groups/models.py @@ -3,7 +3,7 @@ import json from uuid import UUID, uuid4 from functools import reduce from dataclasses import dataclass, asdict -from typing import Any, Sequence, Iterable, Optional, NamedTuple +from typing import Any, Sequence, Iterable, Optional from flask import g from pymonad.maybe import Just, Maybe, Nothing @@ -46,9 +46,11 @@ class GroupRole: role: Role + class GroupCreationError(AuthorisationError): """Raised whenever a group creation fails""" + class MembershipError(AuthorisationError): """Raised when there is an error with a user's membership to a group.""" @@ -60,6 +62,7 @@ class MembershipError(AuthorisationError): f"groups ({groups_str})") super().__init__(f"{type(self).__name__}: {error_description}.") + def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: """Returns all the groups that a member belongs to""" query = ( @@ -74,18 +77,19 @@ def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: return groups + @authorised_p( - privileges = ("system:group:create-group",), - error_description = ( + privileges=("system:group:create-group",), + error_description=( "You do not have the appropriate privileges to enable you to " "create a new group."), - oauth2_scope = "profile group") + oauth2_scope="profile group") def create_group( conn: db.DbConnection, group_name: str, group_leader: User, group_description: Optional[str] = None) -> Group: """Create a new group.""" def resource_category_by_key( - cursor: db.DbCursor, category_key: str): + cursor: db.DbCursor, category_key: str): """Retrieve a resource category by its key.""" cursor.execute( "SELECT * FROM resource_categories WHERE " @@ -103,7 +107,7 @@ def create_group( with db.cursor(conn) as cursor: new_group = save_group( - cursor, group_name,( + cursor, group_name, ( {"group_description": group_description} if group_description else {})) group_resource = { @@ -111,7 +115,9 @@ def create_group( "resource_id": str(uuid4()), "resource_name": group_name, "resource_category_id": str( - resource_category_by_key(cursor, "group")["resource_category_id"]), + resource_category_by_key( + cursor, "group")["resource_category_id"] + ), "public": 0 } cursor.execute( @@ -131,6 +137,7 @@ def create_group( "group-leader") return new_group + @authorised_p(("group:role:create-role",), error_description="Could not create the group role") def create_group_role( @@ -147,6 +154,7 @@ def create_group_role( return GroupRole(group_role_id, group, role) + def authenticated_user_group(conn) -> Maybe: """ Returns the currently authenticated user's group. @@ -171,12 +179,13 @@ def authenticated_user_group(conn) -> Maybe: return Nothing + def user_group(conn: db.DbConnection, user: User) -> Maybe[Group]: """Returns the given user's group""" with db.cursor(conn) as cursor: cursor.execute( - ("SELECT groups.group_id, groups.group_name, groups.group_metadata " - "FROM group_users " + ("SELECT groups.group_id, groups.group_name, " + "groups.group_metadata FROM group_users " "INNER JOIN groups ON group_users.group_id=groups.group_id " "WHERE group_users.user_id = ?"), (str(user.user_id),)) @@ -192,11 +201,12 @@ def user_group(conn: db.DbConnection, user: User) -> Maybe[Group]: return Nothing + def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool: """Check whether the given `user` is the leader of `group`.""" ugroup = user_group(conn, user).maybe( - False, lambda val: val) # type: ignore[arg-type, misc] + False, lambda val: val) # type: ignore[arg-type, misc] if not group: # User cannot be a group leader if not a member of ANY group return False @@ -214,6 +224,7 @@ def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool: return "group-leader" in role_names + def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]: """Retrieve all existing groups""" with db.cursor(conn) as cursor: @@ -226,6 +237,7 @@ def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]: return Nothing + def save_group( cursor: db.DbCursor, group_name: str, group_metadata: dict[str, Any]) -> Group: @@ -236,10 +248,12 @@ def save_group( "VALUES(:group_id, :group_name, :group_metadata) " "ON CONFLICT (group_id) DO UPDATE SET " "group_name=:group_name, group_metadata=:group_metadata"), - {"group_id": str(the_group.group_id), "group_name": the_group.group_name, + {"group_id": str(the_group.group_id), + "group_name": the_group.group_name, "group_metadata": json.dumps(the_group.group_metadata)}) return the_group + def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): """Add `user` to `the_group` as a member.""" cursor.execute( @@ -247,11 +261,12 @@ def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): "ON CONFLICT (group_id, user_id) DO NOTHING"), {"group_id": str(the_group.group_id), "user_id": str(user.user_id)}) + @authorised_p( - privileges = ("system:group:view-group",), - error_description = ( - "You do not have the appropriate privileges to access the list of users" - " in the group.")) + privileges=("system:group:view-group",), + error_description=( + "You do not have the appropriate privileges to " + "access the list of users in the group.")) def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]: """Retrieve all users that are members of group with id `group_id`.""" with db.cursor(conn) as cursor: @@ -264,9 +279,10 @@ def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]: return (User(UUID(row["user_id"]), row["email"], row["name"]) for row in results) + @authorised_p( - privileges = ("system:group:view-group",), - error_description = ( + privileges=("system:group:view-group",), + error_description=( "You do not have the appropriate privileges to access the group.")) def group_by_id(conn: db.DbConnection, group_id: UUID) -> Group: """Retrieve a group by its ID""" @@ -282,19 +298,24 @@ def group_by_id(conn: db.DbConnection, group_id: UUID) -> Group: raise NotFoundError(f"Could not find group with ID '{group_id}'.") + @authorised_p(("system:group:view-group", "system:group:edit-group"), - error_description=("You do not have the appropriate authorisation" - " to act upon the join requests."), + error_description=( + "You do not have the appropriate authorisation" + " to act upon the join requests." + ), oauth2_scope="profile group") def join_requests(conn: db.DbConnection, user: User): """List all the join requests for the user's group.""" with db.cursor(conn) as cursor: - group = user_group(conn, user).maybe(DUMMY_GROUP, lambda grp: grp)# type: ignore[misc] + group = user_group(conn, user).maybe( + DUMMY_GROUP, lambda grp: grp) # type: ignore[misc] if group != DUMMY_GROUP and is_group_leader(conn, user, group): cursor.execute( - "SELECT gjr.*, u.email, u.name FROM group_join_requests AS gjr " - "INNER JOIN users AS u ON gjr.requester_id=u.user_id " - "WHERE gjr.group_id=? AND gjr.status='PENDING'", + "SELECT gjr.*, u.email, u.name FROM \ +group_join_requests AS gjr INNER JOIN users AS u ON \ +gjr.requester_id=u.user_id WHERE gjr.group_id=? AND \ +gjr.status='PENDING'", (str(group.group_id),)) return tuple(dict(row)for row in cursor.fetchall()) @@ -302,16 +323,21 @@ def join_requests(conn: db.DbConnection, user: User): "You do not have the appropriate authorisation to access the " "group's join requests.") + @authorised_p(("system:group:view-group", "system:group:edit-group"), - error_description=("You do not have the appropriate authorisation" - " to act upon the join requests."), + error_description=( + "You do not have the appropriate authorisation" + " to act upon the join requests." + ), oauth2_scope="profile group") def accept_reject_join_request( - conn: db.DbConnection, request_id: UUID, user: User, status: str) -> dict: + conn: db.DbConnection, request_id: UUID, user: User, status: str +) -> dict: """Accept/Reject a join request.""" assert status in ("ACCEPTED", "REJECTED"), f"Invalid status '{status}'." with db.cursor(conn) as cursor: - group = user_group(conn, user).maybe(DUMMY_GROUP, lambda grp: grp) # type: ignore[misc] + group = user_group(conn, user).maybe( + DUMMY_GROUP, lambda grp: grp) # type: ignore[misc] cursor.execute("SELECT * FROM group_join_requests WHERE request_id=?", (str(request_id),)) row = cursor.fetchone() @@ -321,7 +347,8 @@ def accept_reject_join_request( the_user = user_by_id(conn, UUID(row["requester_id"])) if status == "ACCEPTED": add_user_to_group(cursor, group, the_user) - revoke_user_role_by_name(cursor, the_user, "group-creator") + revoke_user_role_by_name( + cursor, the_user, "group-creator") cursor.execute( "UPDATE group_join_requests SET status=? " "WHERE request_id=?", @@ -335,6 +362,7 @@ def accept_reject_join_request( "You cannot act on other groups join requests") raise NotFoundError(f"Could not find request with ID '{request_id}'") + def __organise_privileges__(acc, row): role_id = UUID(row["role_id"]) role = acc.get(role_id, False) @@ -359,11 +387,13 @@ def __organise_privileges__(acc, row): # @authorised_p(("group:role:view",), # "Insufficient privileges to view role", # oauth2_scope="profile group role") + + def group_role_by_id( conn: db.DbConnection, group: Group, group_role_id: UUID) -> GroupRole: """Retrieve GroupRole from id by its `group_role_id`.""" - ## TODO: do privileges check before running actual query - ## the check commented out above doesn't work correctly + # TODO: do privileges check before running actual query + # the check commented out above doesn't work correctly with db.cursor(conn) as cursor: cursor.execute( "SELECT gr.group_role_id, r.*, p.* " @@ -375,20 +405,21 @@ def group_role_by_id( (str(group_role_id), str(group.group_id))) rows = cursor.fetchall() if rows: - roles: tuple[Role,...] = tuple(reduce( + roles: tuple[Role, ...] = tuple(reduce( __organise_privileges__, rows, {}).values()) assert len(roles) == 1 return GroupRole(group_role_id, group, roles[0]) raise NotFoundError( f"Group role with ID '{group_role_id}' does not exist.") + @authorised_p(("group:role:edit-role",), "You do not have the privilege to edit a role.", oauth2_scope="profile group role") def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole, privilege: Privilege) -> GroupRole: """Add `privilege` to `group_role`.""" - ## TODO: do privileges check. + # TODO: do privileges check. check_user_editable(group_role.role) with db.cursor(conn) as cursor: cursor.execute( @@ -404,6 +435,7 @@ def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole, group_role.role.user_editable, group_role.role.privileges + (privilege,))) + @authorised_p(("group:role:edit-role",), "You do not have the privilege to edit a role.", oauth2_scope="profile group role") @@ -411,7 +443,7 @@ def delete_privilege_from_group_role( conn: db.DbConnection, group_role: GroupRole, privilege: Privilege) -> GroupRole: """Delete `privilege` to `group_role`.""" - ## TODO: do privileges check. + # TODO: do privileges check. check_user_editable(group_role.role) with db.cursor(conn) as cursor: cursor.execute( @@ -427,6 +459,7 @@ def delete_privilege_from_group_role( tuple(priv for priv in group_role.role.privileges if priv != privilege))) + def resource_owner(conn: db.DbConnection, resource: Resource) -> Group: """Return the user group that owns the resource.""" with db.cursor(conn) as cursor: @@ -444,6 +477,7 @@ def resource_owner(conn: db.DbConnection, resource: Resource) -> Group: raise MissingGroupError("Resource has no 'owning' group.") + def add_resources_to_group(conn: db.DbConnection, resources: tuple[Resource, ...], group: Group): |