aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-03-03 10:48:30 +0300
committerFrederick Muriuki Muriithi2023-03-03 10:48:30 +0300
commit94502a92d8ae3277b8dd07eb1117367821241913 (patch)
treec6857578610ea4504a0fdfa088f8abf320e57013
parent8f5f0c692293b96c2ec48a810196556764101ccc (diff)
downloadgenenetwork3-94502a92d8ae3277b8dd07eb1117367821241913.tar.gz
auth: group roles: enable creation and listing of group roles.
-rw-r--r--gn3/auth/authorisation/groups/views.py88
-rw-r--r--gn3/auth/authorisation/privileges.py16
2 files changed, 102 insertions, 2 deletions
diff --git a/gn3/auth/authorisation/groups/views.py b/gn3/auth/authorisation/groups/views.py
index 0e779eb..4da6781 100644
--- a/gn3/auth/authorisation/groups/views.py
+++ b/gn3/auth/authorisation/groups/views.py
@@ -1,6 +1,7 @@
"""The views/routes for the `gn3.auth.authorisation.groups` package."""
import uuid
import datetime
+from typing import Iterable
from functools import partial
from flask import request, jsonify, Response, Blueprint, current_app
@@ -13,10 +14,13 @@ from gn3.auth.db_utils import with_db_connection
from .data import link_data_to_group, retrieve_ungrouped_data
from .models import (
- user_group, all_groups, DUMMY_GROUP, group_by_id, join_requests,
+ user_group, all_groups, DUMMY_GROUP, GroupRole, group_by_id, join_requests,
GroupCreationError, accept_reject_join_request, group_users as _group_users,
- create_group as _create_group)
+ create_group as _create_group, create_group_role as _create_group_role)
+from ..roles.models import Role
+from ..checks import authorised_p
+from ..privileges import Privilege, privileges_by_ids
from ..errors import InvalidData, AuthorisationError
from ...authentication.users import User
@@ -218,3 +222,83 @@ def link_data() -> Response:
conn, gn3conn, dataset_type, dataset_id, group)
return jsonify(with_db_connection(__link__))
+
+@groups.route("/roles", methods=["GET"])
+def group_roles():
+ """Return a list of all available group roles."""
+ with require_oauth.acquire("profile group role") as the_token:
+ def __list_roles__(conn: db.DbConnection):
+ ## TODO: Check that user has appropriate privileges
+ with db.cursor(conn) as cursor:
+ group = user_group(cursor, the_token.user).maybe(# type: ignore[misc]
+ DUMMY_GROUP, lambda grp: grp)
+ if group == DUMMY_GROUP:
+ return tuple()
+ cursor.execute(
+ "SELECT gr.group_role_id, r.* "
+ "FROM group_roles AS gr INNER JOIN roles AS r "
+ "ON gr.role_id=r.role_id "
+ "WHERE group_id=?",
+ (str(group.group_id),))
+ return tuple(
+ GroupRole(uuid.UUID(row["group_role_id"]),
+ group,
+ Role(uuid.UUID(row["role_id"]),
+ row["role_name"],
+ tuple()))
+ for row in cursor.fetchall())
+ return jsonify(tuple(
+ dictify(role) for role in with_db_connection(__list_roles__)))
+
+@groups.route("/privileges", methods=["GET"])
+def group_privileges():
+ """Return a list of all available group roles."""
+ with require_oauth.acquire("profile group role") as _the_token:
+ def __list_privileges__(conn: db.DbConnection) -> Iterable[Privilege]:
+ ## TODO: Check that user has appropriate privileges
+ with db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM privileges "
+ "WHERE privilege_id LIKE 'group:%'")
+ return (
+ Privilege(row["privilege_id"], row["privilege_description"])
+ for row in cursor.fetchall())
+ return jsonify(tuple(
+ dictify(priv) for priv in with_db_connection(__list_privileges__)))
+
+
+
+@groups.route("/role/create", methods=["POST"])
+def create_group_role():
+ """Create a new group role."""
+ with require_oauth.acquire("profile group role") as the_token:
+ ## TODO: Check that user has appropriate privileges
+ @authorised_p(("group:role:create-role",),
+ "You do not have the privilege to create new roles",
+ oauth2_scope="profile group role")
+ def __create__(conn: db.DbConnection) -> GroupRole:
+ ## TODO: Check user cannot assign any privilege they don't have.
+ form = request.form
+ role_name = form.get("role_name", "").strip()
+ privileges_ids = form.getlist("privileges[]")
+ if len(role_name) == 0:
+ raise InvalidData("Role name not provided!")
+ if len(privileges_ids) == 0:
+ raise InvalidData(
+ "At least one privilege needs to be provided.")
+
+ with db.cursor(conn) as cursor:
+ group = user_group(cursor, the_token.user).maybe(# type: ignore[misc]
+ DUMMY_GROUP, lambda grp: grp)
+
+ if group == DUMMY_GROUP:
+ raise AuthorisationError(
+ "A user without a group cannot create a new role.")
+ privileges = privileges_by_ids(conn, tuple(privileges_ids))
+ if len(privileges_ids) != len(privileges):
+ raise InvalidData(
+ f"{len(privileges_ids) - len(privileges)} of the selected "
+ "privileges were not found in the database.")
+
+ return _create_group_role(conn, group, role_name, privileges)
+
+ return jsonify(with_db_connection(__create__))
diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py
index ae4ed88..dbb4129 100644
--- a/gn3/auth/authorisation/privileges.py
+++ b/gn3/auth/authorisation/privileges.py
@@ -29,3 +29,19 @@ def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]:
results = cursor.fetchall()
return (Privilege(row[0], row[1]) for row in results)
+
+def privileges_by_ids(
+ conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[
+ Privilege, ...]:
+ """Fetch privileges by their ids."""
+ if len(privileges_ids) == 0:
+ return tuple()
+
+ with db.cursor(conn) as cursor:
+ clause = ", ".join(["?"] * len(privileges_ids))
+ cursor.execute(
+ f"SELECT * FROM privileges WHERE privilege_id IN ({clause})",
+ privileges_ids)
+ return tuple(
+ Privilege(row["privilege_id"], row["privilege_description"])
+ for row in cursor.fetchall())