From 94502a92d8ae3277b8dd07eb1117367821241913 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 3 Mar 2023 10:48:30 +0300 Subject: auth: group roles: enable creation and listing of group roles. --- gn3/auth/authorisation/groups/views.py | 88 +++++++++++++++++++++++++++++++++- gn3/auth/authorisation/privileges.py | 16 +++++++ 2 files changed, 102 insertions(+), 2 deletions(-) (limited to 'gn3') diff --git a/gn3/auth/authorisation/groups/views.py b/gn3/auth/authorisation/groups/views.py index 0e779eb..4da6781 100644 --- a/gn3/auth/authorisation/groups/views.py +++ b/gn3/auth/authorisation/groups/views.py @@ -1,6 +1,7 @@ """The views/routes for the `gn3.auth.authorisation.groups` package.""" import uuid import datetime +from typing import Iterable from functools import partial from flask import request, jsonify, Response, Blueprint, current_app @@ -13,10 +14,13 @@ from gn3.auth.db_utils import with_db_connection from .data import link_data_to_group, retrieve_ungrouped_data from .models import ( - user_group, all_groups, DUMMY_GROUP, group_by_id, join_requests, + user_group, all_groups, DUMMY_GROUP, GroupRole, group_by_id, join_requests, GroupCreationError, accept_reject_join_request, group_users as _group_users, - create_group as _create_group) + create_group as _create_group, create_group_role as _create_group_role) +from ..roles.models import Role +from ..checks import authorised_p +from ..privileges import Privilege, privileges_by_ids from ..errors import InvalidData, AuthorisationError from ...authentication.users import User @@ -218,3 +222,83 @@ def link_data() -> Response: conn, gn3conn, dataset_type, dataset_id, group) return jsonify(with_db_connection(__link__)) + +@groups.route("/roles", methods=["GET"]) +def group_roles(): + """Return a list of all available group roles.""" + with require_oauth.acquire("profile group role") as the_token: + def __list_roles__(conn: db.DbConnection): + ## TODO: Check that user has appropriate privileges + with db.cursor(conn) as cursor: + group = user_group(cursor, the_token.user).maybe(# type: ignore[misc] + DUMMY_GROUP, lambda grp: grp) + if group == DUMMY_GROUP: + return tuple() + cursor.execute( + "SELECT gr.group_role_id, r.* " + "FROM group_roles AS gr INNER JOIN roles AS r " + "ON gr.role_id=r.role_id " + "WHERE group_id=?", + (str(group.group_id),)) + return tuple( + GroupRole(uuid.UUID(row["group_role_id"]), + group, + Role(uuid.UUID(row["role_id"]), + row["role_name"], + tuple())) + for row in cursor.fetchall()) + return jsonify(tuple( + dictify(role) for role in with_db_connection(__list_roles__))) + +@groups.route("/privileges", methods=["GET"]) +def group_privileges(): + """Return a list of all available group roles.""" + with require_oauth.acquire("profile group role") as _the_token: + def __list_privileges__(conn: db.DbConnection) -> Iterable[Privilege]: + ## TODO: Check that user has appropriate privileges + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM privileges " + "WHERE privilege_id LIKE 'group:%'") + return ( + Privilege(row["privilege_id"], row["privilege_description"]) + for row in cursor.fetchall()) + return jsonify(tuple( + dictify(priv) for priv in with_db_connection(__list_privileges__))) + + + +@groups.route("/role/create", methods=["POST"]) +def create_group_role(): + """Create a new group role.""" + with require_oauth.acquire("profile group role") as the_token: + ## TODO: Check that user has appropriate privileges + @authorised_p(("group:role:create-role",), + "You do not have the privilege to create new roles", + oauth2_scope="profile group role") + def __create__(conn: db.DbConnection) -> GroupRole: + ## TODO: Check user cannot assign any privilege they don't have. + form = request.form + role_name = form.get("role_name", "").strip() + privileges_ids = form.getlist("privileges[]") + if len(role_name) == 0: + raise InvalidData("Role name not provided!") + if len(privileges_ids) == 0: + raise InvalidData( + "At least one privilege needs to be provided.") + + with db.cursor(conn) as cursor: + group = user_group(cursor, the_token.user).maybe(# type: ignore[misc] + DUMMY_GROUP, lambda grp: grp) + + if group == DUMMY_GROUP: + raise AuthorisationError( + "A user without a group cannot create a new role.") + privileges = privileges_by_ids(conn, tuple(privileges_ids)) + if len(privileges_ids) != len(privileges): + raise InvalidData( + f"{len(privileges_ids) - len(privileges)} of the selected " + "privileges were not found in the database.") + + return _create_group_role(conn, group, role_name, privileges) + + return jsonify(with_db_connection(__create__)) diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py index ae4ed88..dbb4129 100644 --- a/gn3/auth/authorisation/privileges.py +++ b/gn3/auth/authorisation/privileges.py @@ -29,3 +29,19 @@ def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]: results = cursor.fetchall() return (Privilege(row[0], row[1]) for row in results) + +def privileges_by_ids( + conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[ + Privilege, ...]: + """Fetch privileges by their ids.""" + if len(privileges_ids) == 0: + return tuple() + + with db.cursor(conn) as cursor: + clause = ", ".join(["?"] * len(privileges_ids)) + cursor.execute( + f"SELECT * FROM privileges WHERE privilege_id IN ({clause})", + privileges_ids) + return tuple( + Privilege(row["privilege_id"], row["privilege_description"]) + for row in cursor.fetchall()) -- cgit v1.2.3