about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn3/auth/authorisation/groups/views.py88
-rw-r--r--gn3/auth/authorisation/privileges.py16
2 files changed, 102 insertions, 2 deletions
diff --git a/gn3/auth/authorisation/groups/views.py b/gn3/auth/authorisation/groups/views.py
index 0e779eb..4da6781 100644
--- a/gn3/auth/authorisation/groups/views.py
+++ b/gn3/auth/authorisation/groups/views.py
@@ -1,6 +1,7 @@
 """The views/routes for the `gn3.auth.authorisation.groups` package."""
 import uuid
 import datetime
+from typing import Iterable
 from functools import partial
 
 from flask import request, jsonify, Response, Blueprint, current_app
@@ -13,10 +14,13 @@ from gn3.auth.db_utils import with_db_connection
 
 from .data import link_data_to_group, retrieve_ungrouped_data
 from .models import (
-    user_group, all_groups, DUMMY_GROUP, group_by_id, join_requests,
+    user_group, all_groups, DUMMY_GROUP, GroupRole, group_by_id, join_requests,
     GroupCreationError, accept_reject_join_request, group_users as _group_users,
-    create_group as _create_group)
+    create_group as _create_group, create_group_role as _create_group_role)
 
+from ..roles.models import Role
+from ..checks import authorised_p
+from ..privileges import Privilege, privileges_by_ids
 from ..errors import InvalidData, AuthorisationError
 
 from ...authentication.users import User
@@ -218,3 +222,83 @@ def link_data() -> Response:
                     conn, gn3conn, dataset_type, dataset_id, group)
 
         return jsonify(with_db_connection(__link__))
+
+@groups.route("/roles", methods=["GET"])
+def group_roles():
+    """Return a list of all available group roles."""
+    with require_oauth.acquire("profile group role") as the_token:
+        def __list_roles__(conn: db.DbConnection):
+            ## TODO: Check that user has appropriate privileges
+            with db.cursor(conn) as cursor:
+                group = user_group(cursor, the_token.user).maybe(# type: ignore[misc]
+                    DUMMY_GROUP, lambda grp: grp)
+                if group == DUMMY_GROUP:
+                    return tuple()
+                cursor.execute(
+                    "SELECT gr.group_role_id, r.* "
+                    "FROM group_roles AS gr INNER JOIN roles AS r "
+                    "ON gr.role_id=r.role_id "
+                    "WHERE group_id=?",
+                    (str(group.group_id),))
+                return tuple(
+                    GroupRole(uuid.UUID(row["group_role_id"]),
+                              group,
+                              Role(uuid.UUID(row["role_id"]),
+                                   row["role_name"],
+                                   tuple()))
+                    for row in cursor.fetchall())
+        return jsonify(tuple(
+            dictify(role) for role in with_db_connection(__list_roles__)))
+
+@groups.route("/privileges", methods=["GET"])
+def group_privileges():
+    """Return a list of all available group roles."""
+    with require_oauth.acquire("profile group role") as _the_token:
+        def __list_privileges__(conn: db.DbConnection) -> Iterable[Privilege]:
+            ## TODO: Check that user has appropriate privileges
+            with db.cursor(conn) as cursor:
+                cursor.execute("SELECT * FROM privileges "
+                               "WHERE privilege_id LIKE 'group:%'")
+                return (
+                    Privilege(row["privilege_id"], row["privilege_description"])
+                    for row in cursor.fetchall())
+        return jsonify(tuple(
+            dictify(priv) for priv in with_db_connection(__list_privileges__)))
+
+
+
+@groups.route("/role/create", methods=["POST"])
+def create_group_role():
+    """Create a new group role."""
+    with require_oauth.acquire("profile group role") as the_token:
+        ## TODO: Check that user has appropriate privileges
+        @authorised_p(("group:role:create-role",),
+                      "You do not have the privilege to create new roles",
+                      oauth2_scope="profile group role")
+        def __create__(conn: db.DbConnection) -> GroupRole:
+            ## TODO: Check user cannot assign any privilege they don't have.
+            form = request.form
+            role_name = form.get("role_name", "").strip()
+            privileges_ids = form.getlist("privileges[]")
+            if len(role_name) == 0:
+                raise InvalidData("Role name not provided!")
+            if len(privileges_ids) == 0:
+                raise InvalidData(
+                    "At least one privilege needs to be provided.")
+
+            with db.cursor(conn) as cursor:
+                group = user_group(cursor, the_token.user).maybe(# type: ignore[misc]
+                    DUMMY_GROUP, lambda grp: grp)
+
+            if group == DUMMY_GROUP:
+                raise AuthorisationError(
+                    "A user without a group cannot create a new role.")
+            privileges = privileges_by_ids(conn, tuple(privileges_ids))
+            if len(privileges_ids) != len(privileges):
+                raise InvalidData(
+                    f"{len(privileges_ids) - len(privileges)} of the selected "
+                    "privileges were not found in the database.")
+
+            return _create_group_role(conn, group, role_name, privileges)
+
+        return jsonify(with_db_connection(__create__))
diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py
index ae4ed88..dbb4129 100644
--- a/gn3/auth/authorisation/privileges.py
+++ b/gn3/auth/authorisation/privileges.py
@@ -29,3 +29,19 @@ def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]:
         results = cursor.fetchall()
 
     return (Privilege(row[0], row[1]) for row in results)
+
+def privileges_by_ids(
+        conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[
+            Privilege, ...]:
+    """Fetch privileges by their ids."""
+    if len(privileges_ids) == 0:
+        return tuple()
+
+    with db.cursor(conn) as cursor:
+        clause = ", ".join(["?"] * len(privileges_ids))
+        cursor.execute(
+            f"SELECT * FROM privileges WHERE privilege_id IN ({clause})",
+            privileges_ids)
+        return tuple(
+            Privilege(row["privilege_id"], row["privilege_description"])
+            for row in cursor.fetchall())