diff options
Diffstat (limited to 'gn3/auth/authorisation/groups.py')
-rw-r--r-- | gn3/auth/authorisation/groups.py | 97 |
1 files changed, 71 insertions, 26 deletions
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py index 3367b57..6d1b1a3 100644 --- a/gn3/auth/authorisation/groups.py +++ b/gn3/auth/authorisation/groups.py @@ -1,23 +1,31 @@ """Handle the management of resource/user groups.""" +import json from uuid import UUID, uuid4 -from typing import Sequence, Iterable, NamedTuple +from typing import Any, Sequence, Iterable, Optional, NamedTuple from flask import g from pymonad.maybe import Just, Maybe, Nothing from gn3.auth import db from gn3.auth.authentication.users import User +from gn3.auth.dictify import register_dictifier from gn3.auth.authentication.checks import authenticated_p from .checks import authorised_p from .privileges import Privilege -from .roles import Role, create_role from .errors import AuthorisationError +from .roles import ( + Role, create_role, revoke_user_role_by_name, assign_user_role_by_name) class Group(NamedTuple): """Class representing a group.""" group_id: UUID group_name: str + group_metadata: dict[str, Any] + +register_dictifier(Group, lambda grp: { + "group_id": grp.group_id, "group_name": grp.group_name, + "group_metadata": grp.group_metadata}) class GroupRole(NamedTuple): """Class representing a role tied/belonging to a group.""" @@ -25,6 +33,9 @@ class GroupRole(NamedTuple): group: Group role: Role +class GroupCreationError(AuthorisationError): + """Raised whenever a group creation fails""" + class MembershipError(AuthorisationError): """Raised when there is an error with a user's membership to a group.""" @@ -39,35 +50,43 @@ class MembershipError(AuthorisationError): def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: """Returns all the groups that a member belongs to""" query = ( - "SELECT groups.group_id, group_name FROM group_users INNER JOIN groups " + "SELECT groups.group_id, group_name, groups.group_metadata " + "FROM group_users INNER JOIN groups " "ON group_users.group_id=groups.group_id " "WHERE group_users.user_id=?") with db.cursor(conn) as cursor: cursor.execute(query, (str(user.user_id),)) - groups = tuple(Group(row[0], row[1]) for row in cursor.fetchall()) + groups = tuple(Group(row[0], row[1], json.loads(row[2])) + for row in cursor.fetchall()) return groups @authenticated_p -@authorised_p(("system:group:create-group",), - error_message="Failed to create group.") -def create_group(conn: db.DbConnection, group_name: str, - group_leader: User) -> Group: - """Create a group""" - group = Group(uuid4(), group_name) +def create_group( + conn: db.DbConnection, group_name: str, group_leader: User, + group_description: Optional[str] = None) -> Group: + """Create a new group.""" user_groups = user_membership(conn, group_leader) if len(user_groups) > 0: raise MembershipError(group_leader, user_groups) - with db.cursor(conn) as cursor: - cursor.execute( - "INSERT INTO groups(group_id, group_name) VALUES (?, ?)", - (str(group.group_id), group_name)) - cursor.execute( - "INSERT INTO group_users VALUES (?, ?)", - (str(group.group_id), str(group_leader.user_id))) - - return group + @authorised_p( + ("system:group:create-group",), ( + "You do not have the appropriate privileges to enable you to " + "create a new group."), + group_leader) + def __create_group__(): + with db.cursor(conn) as cursor: + new_group = __save_group__( + cursor, group_name,( + {"group_description": group_description} + if group_description else {})) + add_user_to_group(cursor, new_group, group_leader) + revoke_user_role_by_name(cursor, group_leader, "group-creator") + assign_user_role_by_name(cursor, group_leader, "group-leader") + return new_group + + return __create_group__() @authenticated_p @authorised_p(("group:role:create-role",), @@ -96,11 +115,12 @@ def authenticated_user_group(conn) -> Maybe: user = g.user with db.cursor(conn) as cursor: cursor.execute( - ("SELECT groups.group_id, groups.group_name FROM group_users " + ("SELECT groups.* FROM group_users " "INNER JOIN groups ON group_users.group_id=groups.group_id " "WHERE group_users.user_id = ?"), (str(user.user_id),)) - groups = tuple(Group(UUID(row[0]), row[1]) for row in cursor.fetchall()) + groups = tuple(Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall()) if len(groups) > 1: raise MembershipError(user, groups) @@ -110,14 +130,17 @@ def authenticated_user_group(conn) -> Maybe: return Nothing -def user_group(cursor: db.DbCursor, user: User) -> Maybe: +def user_group(cursor: db.DbCursor, user: User) -> Maybe[Group]: """Returns the given user's group""" cursor.execute( - ("SELECT groups.group_id, groups.group_name FROM group_users " + ("SELECT groups.group_id, groups.group_name, groups.group_metadata " + "FROM group_users " "INNER JOIN groups ON group_users.group_id=groups.group_id " "WHERE group_users.user_id = ?"), (str(user.user_id),)) - groups = tuple(Group(UUID(row[0]), row[1]) for row in cursor.fetchall()) + groups = tuple( + Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall()) if len(groups) > 1: raise MembershipError(user, groups) @@ -129,7 +152,7 @@ def user_group(cursor: db.DbCursor, user: User) -> Maybe: def is_group_leader(cursor: db.DbCursor, user: User, group: Group): """Check whether the given `user` is the leader of `group`.""" - ugroup = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[misc] + ugroup = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[arg-type, misc] if not group: # User cannot be a group leader if not a member of ANY group return False @@ -153,6 +176,28 @@ def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]: res = cursor.fetchall() if res: return Just(tuple( - Group(row["group_id"], row["group_name"]) for row in res)) + Group(row["group_id"], row["group_name"], + json.loads(row["group_metadata"])) for row in res)) return Nothing + +def __save_group__( + cursor: db.DbCursor, group_name: str, + group_metadata: dict[str, Any]) -> Group: + """Save a group to db""" + the_group = Group(uuid4(), group_name, group_metadata) + cursor.execute( + ("INSERT INTO groups " + "VALUES(:group_id, :group_name, :group_metadata) " + "ON CONFLICT (group_id) DO UPDATE SET " + "group_name=:group_name, group_metadata=:group_metadata"), + {"group_id": str(the_group.group_id), "group_name": the_group.group_name, + "group_metadata": json.dumps(the_group.group_metadata)}) + return the_group + +def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): + """Add `user` to `the_group` as a member.""" + cursor.execute( + ("INSERT INTO group_users VALUES (:group_id, :user_id) " + "ON CONFLICT (group_id, user_id) DO NOTHING"), + {"group_id": str(the_group.group_id), "user_id": str(user.user_id)}) |