diff options
Diffstat (limited to 'gn_auth/auth/authorisation/resources/groups/models.py')
| -rw-r--r-- | gn_auth/auth/authorisation/resources/groups/models.py | 160 |
1 files changed, 146 insertions, 14 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/models.py b/gn_auth/auth/authorisation/resources/groups/models.py index b8a5235..07e6dbe 100644 --- a/gn_auth/auth/authorisation/resources/groups/models.py +++ b/gn_auth/auth/authorisation/resources/groups/models.py @@ -1,5 +1,6 @@ """Handle the management of resource/user groups.""" import json +import datetime from uuid import UUID, uuid4 from functools import reduce from dataclasses import dataclass @@ -17,6 +18,9 @@ from gn_auth.auth.authentication.users import User, user_by_id from gn_auth.auth.authorisation.checks import authorised_p from gn_auth.auth.authorisation.privileges import Privilege from gn_auth.auth.authorisation.resources.errors import MissingGroupError +from gn_auth.auth.authorisation.resources.system.models import system_resource +from gn_auth.auth.authorisation.resources.common import ( + grant_access_to_sysadmins) from gn_auth.auth.authorisation.resources.base import ( Resource, resource_from_dbrow) @@ -97,8 +101,12 @@ def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: "create a new group."), oauth2_scope="profile group") def create_group( - conn: db.DbConnection, group_name: str, group_leader: User, - group_description: Optional[str] = None) -> Group: + conn: db.DbConnection, + group_name: str, + group_leader: User, + group_description: Optional[str] = None, + creator: Optional[User] = None +) -> Group: """Create a new group.""" def resource_category_by_key( cursor: db.DbCursor, category_key: str): @@ -122,31 +130,38 @@ def create_group( cursor, group_name, ( {"group_description": group_description} if group_description else {})) + _group_resource_id = uuid4() _group_resource = { "group_id": str(new_group.group_id), - "resource_id": str(uuid4()), + "resource_id": str(_group_resource_id), "resource_name": group_name, "resource_category_id": str( resource_category_by_key( cursor, "group")["resource_category_id"] ), - "public": 0 + "public": 0, + "created_by": str( + creator.user_id if creator else group_leader.user_id), + "created_at": datetime.datetime.now().timestamp() } cursor.execute( "INSERT INTO resources VALUES " - "(:resource_id, :resource_name, :resource_category_id, :public)", + "(:resource_id, :resource_name, :resource_category_id, :public, " + ":created_by, :created_at)", _group_resource) cursor.execute( "INSERT INTO group_resources(resource_id, group_id) " "VALUES(:resource_id, :group_id)", _group_resource) + grant_access_to_sysadmins(cursor, + _group_resource_id, + system_resource(conn).resource_id) add_user_to_group(cursor, new_group, group_leader) revoke_user_role_by_name(cursor, group_leader, "group-creator") - assign_user_role_by_name( - cursor, - group_leader, - UUID(str(_group_resource["resource_id"])), - "group-leader") + assign_user_role_by_name(cursor, + group_leader, + _group_resource_id, + "group-leader") return new_group @@ -237,9 +252,12 @@ def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool: return "group-leader" in role_names -def __build_groups_list_query__(base: str, search: Optional[str] = None) -> tuple[str, tuple[Optional[str], ...]]: +def __build_groups_list_query__( + base: str, + search: Optional[str] = None +) -> tuple[str, tuple[Optional[str], ...]]: """Build up the query from given search terms.""" - if bool(search): + if search is not None and search.strip() != "": _search = search.strip() return ((f"{base} WHERE groups.group_name LIKE ? " "OR groups.group_metadata LIKE ?"), @@ -310,6 +328,56 @@ def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): ("INSERT INTO group_users VALUES (:group_id, :user_id) " "ON CONFLICT (group_id, user_id) DO NOTHING"), {"group_id": str(the_group.group_id), "user_id": str(user.user_id)}) + revoke_user_role_by_name(cursor, user, "group-creator") + + +def resource_from_group(conn: db.DbConnection, the_group: Group) -> Resource: + """Get the resource object that wraps the group for auth purposes.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT " + "resources.resource_id, resources.resource_name, " + "resources.public, resource_categories.* " + "FROM group_resources " + "INNER JOIN resources " + "ON group_resources.resource_id=resources.resource_id " + "INNER JOIN resource_categories " + "ON resources.resource_category_id=resource_categories.resource_category_id " + "WHERE group_resources.group_id=?", + (str(the_group.group_id),)) + results = tuple(resource_from_dbrow(row) for row in cursor.fetchall()) + match len(results): + case 0: + raise InconsistencyError("The group lacks a wrapper resource.") + case 1: + return results[0] + case _: + raise InconsistencyError( + "The group has more than one wrapper resource.") + + +def remove_user_from_group( + conn: db.DbConnection, + group: Group, + user: User, + grp_resource: Resource +): + """Add `user` to `group` as a member.""" + with db.cursor(conn) as cursor: + cursor.execute( + "DELETE FROM group_users " + "WHERE group_id=:group_id AND user_id=:user_id", + {"group_id": str(group.group_id), "user_id": str(user.user_id)}) + cursor.execute( + "DELETE FROM user_roles WHERE user_id=? AND resource_id=?", + (str(user.user_id), str(grp_resource.resource_id))) + assign_user_role_by_name(cursor, + user, + grp_resource.resource_id, + "group-creator") + grant_access_to_sysadmins(cursor, + grp_resource.resource_id, + system_resource(conn).resource_id) @authorised_p( @@ -369,8 +437,8 @@ gjr.status='PENDING'", return tuple(dict(row)for row in cursor.fetchall()) raise AuthorisationError( - "You do not have the appropriate authorisation to access the " - "group's join requests.") + "You need to be the group's leader in order to access the group's join " + "requests.") @authorised_p(("system:group:view-group", "system:group:edit-group"), @@ -580,3 +648,67 @@ def group_resource(conn: db.DbConnection, group_id: UUID) -> Resource: raise NotFoundError("Could not find a resource for group with ID " f"{group_id}") + + +def data_resources( + conn: db.DbConnection, group_id: UUID) -> Iterable[Resource]: + """Fetch a group's data resources.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT resource_ownership.group_id, resources.resource_id, " + "resources.resource_name, resources.public, resource_categories.* " + "FROM resource_ownership INNER JOIN resources " + "ON resource_ownership.resource_id=resources.resource_id " + "INNER JOIN resource_categories " + "ON resources.resource_category_id=resource_categories.resource_category_id " + "WHERE group_id=?", + (str(group_id),)) + yield from (resource_from_dbrow(row) for row in cursor.fetchall()) + + +def group_leaders(conn: db.DbConnection, group_id: UUID) -> Iterable[User]: + """Fetch all of a group's group leaders.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT users.* FROM group_users INNER JOIN group_resources " + "ON group_users.group_id=group_resources.group_id " + "INNER JOIN user_roles " + "ON group_resources.resource_id=user_roles.resource_id " + "INNER JOIN roles " + "ON user_roles.role_id=roles.role_id " + "INNER JOIN users " + "ON user_roles.user_id=users.user_id " + "WHERE group_users.group_id=? " + "AND roles.role_name='group-leader'", + (str(group_id),)) + yield from (User.from_sqlite3_row(row) for row in cursor.fetchall()) + + +def delete_group(conn: db.DbConnection, group_id: UUID): + """ + Delete the group with the given ID + + Parameters: + conn (db.DbConnection): an open connection to an SQLite3 database. + group_id (uuid.UUID): The identifier for the group to delete. + + Returns: + None: It does not return a value. + + Raises: + sqlite3.IntegrityError: if the group has members or linked resources, or + both. + """ + rsc = group_resource(conn, group_id) + with db.cursor(conn) as cursor: + cursor.execute("DELETE FROM group_join_requests WHERE group_id=?", + (str(group_id),)) + cursor.execute("DELETE FROM user_roles WHERE resource_id=?", + (str(rsc.resource_id),)) + cursor.execute( + "DELETE FROM group_resources WHERE group_id=? AND resource_id=?", + (str(group_id), str(rsc.resource_id))) + cursor.execute("DELETE FROM resources WHERE resource_id=?", + (str(rsc.resource_id),)) + cursor.execute("DELETE FROM groups WHERE group_id=?", + (str(group_id),)) |
