aboutsummaryrefslogtreecommitdiff
path: root/gn3
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-03-07 13:53:29 +0300
committerFrederick Muriuki Muriithi2023-03-07 13:53:29 +0300
commit8621b737b01be5a6f238725c65771dea1410f0bb (patch)
tree925e050d6f7e02e550553f12e663077a84e8da97 /gn3
parenta3a87ae52200cca7586b64d4e15cb12c88b17cd7 (diff)
downloadgenenetwork3-8621b737b01be5a6f238725c65771dea1410f0bb.tar.gz
auth: group_roles: Enable addition/deletion of privileges
Diffstat (limited to 'gn3')
-rw-r--r--gn3/auth/authorisation/groups/models.py34
-rw-r--r--gn3/auth/authorisation/groups/views.py60
-rw-r--r--gn3/auth/authorisation/roles/models.py4
3 files changed, 95 insertions, 3 deletions
diff --git a/gn3/auth/authorisation/groups/models.py b/gn3/auth/authorisation/groups/models.py
index b1f307f..5a58322 100644
--- a/gn3/auth/authorisation/groups/models.py
+++ b/gn3/auth/authorisation/groups/models.py
@@ -344,3 +344,37 @@ def group_role_by_id(
return GroupRole(group_role_id, group, roles[0])
raise NotFoundError(
f"Group role with ID '{group_role_id}' does not exist.")
+
+def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole,
+ privilege: Privilege) -> GroupRole:
+ """Add `privilege` to `group_role`."""
+ ## TODO: do privileges check.
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ "INSERT INTO role_privileges(role_id,privilege_id) "
+ "VALUES (?, ?) ON CONFLICT (role_id, privilege_id) "
+ "DO NOTHING",
+ (str(group_role.role.role_id), str(privilege.privilege_id)))
+ return GroupRole(
+ group_role.group_role_id,
+ group_role.group,
+ Role(group_role.role.role_id,
+ group_role.role.role_name,
+ group_role.role.privileges + (privilege,)))
+
+def delete_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole,
+ privilege: Privilege) -> GroupRole:
+ """Delete `privilege` to `group_role`."""
+ ## TODO: do privileges check.
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ "DELETE FROM role_privileges WHERE "
+ "role_id=? AND privilege_id=?",
+ (str(group_role.role.role_id), str(privilege.privilege_id)))
+ return GroupRole(
+ group_role.group_role_id,
+ group_role.group,
+ Role(group_role.role.role_id,
+ group_role.role.role_name,
+ tuple(priv for priv in group_role.role.privileges
+ if priv != privilege)))
diff --git a/gn3/auth/authorisation/groups/views.py b/gn3/auth/authorisation/groups/views.py
index 8b7adef..cf99975 100644
--- a/gn3/auth/authorisation/groups/views.py
+++ b/gn3/auth/authorisation/groups/views.py
@@ -17,12 +17,13 @@ from .models import (
user_group, all_groups, DUMMY_GROUP, GroupRole, group_by_id, join_requests,
group_role_by_id, GroupCreationError, accept_reject_join_request,
group_users as _group_users, create_group as _create_group,
+ add_privilege_to_group_role, delete_privilege_to_group_role,
create_group_role as _create_group_role)
from ..roles.models import Role
from ..checks import authorised_p
from ..privileges import Privilege, privileges_by_ids
-from ..errors import InvalidData, AuthorisationError
+from ..errors import InvalidData, NotFoundError, AuthorisationError
from ...authentication.users import User
from ...authentication.oauth2.resource_server import require_oauth
@@ -225,6 +226,7 @@ def link_data() -> Response:
return jsonify(with_db_connection(__link__))
@groups.route("/roles", methods=["GET"])
+@require_oauth("profile group")
def group_roles():
"""Return a list of all available group roles."""
with require_oauth.acquire("profile group role") as the_token:
@@ -252,6 +254,7 @@ def group_roles():
dictify(role) for role in with_db_connection(__list_roles__)))
@groups.route("/privileges", methods=["GET"])
+@require_oauth("profile group")
def group_privileges():
"""Return a list of all available group roles."""
with require_oauth.acquire("profile group role") as _the_token:
@@ -269,6 +272,7 @@ def group_privileges():
@groups.route("/role/create", methods=["POST"])
+@require_oauth("profile group")
def create_group_role():
"""Create a new group role."""
with require_oauth.acquire("profile group role") as the_token:
@@ -305,6 +309,7 @@ def create_group_role():
return jsonify(with_db_connection(__create__))
@groups.route("/role/<uuid:group_role_id>", methods=["GET"])
+@require_oauth("profile group")
def view_group_role(group_role_id: uuid.UUID):
"""Return the details of the given role."""
with require_oauth.acquire("profile group role") as the_token:
@@ -318,3 +323,56 @@ def view_group_role(group_role_id: uuid.UUID):
"A user without a group cannot view group roles.")
return group_role_by_id(conn, group, group_role_id)
return jsonify(dictify(with_db_connection(__group_role__)))
+
+def __add_remove_priv_to_from_role__(conn: db.DbConnection,
+ group_role_id: uuid.UUID,
+ direction: str,
+ user: User) -> GroupRole:
+ assert direction in ("ADD", "DELETE")
+ with db.cursor(conn) as cursor:
+ group = user_group(cursor, user).maybe(# type: ignore[misc]
+ DUMMY_GROUP, lambda grp: grp)
+
+ if group == DUMMY_GROUP:
+ raise AuthorisationError(
+ "You need to be a member of a group to edit roles.")
+ try:
+ privilege_id = request.form.get("privilege_id", "")
+ assert bool(privilege_id), "Privilege to add must be provided."
+ privileges = privileges_by_ids(conn, (privilege_id,))
+ if len(privileges) == 0:
+ raise NotFoundError("Privilege not found.")
+ dir_fns = {
+ "ADD": add_privilege_to_group_role,
+ "DELETE": delete_privilege_to_group_role
+ }
+ return dir_fns[direction](
+ conn,
+ group_role_by_id(conn, group, group_role_id),
+ privileges[0])
+ except AssertionError as aerr:
+ raise InvalidData(aerr.args[0]) from aerr
+
+@groups.route("/role/<uuid:group_role_id>/privilege/add", methods=["POST"])
+@require_oauth("profile group")
+def add_priv_to_role(group_role_id: uuid.UUID) -> Response:
+ """Add privilege to group role."""
+ with require_oauth.acquire("profile group role") as the_token:
+ return jsonify({
+ **dictify(with_db_connection(partial(
+ __add_remove_priv_to_from_role__, group_role_id=group_role_id,
+ direction="ADD", user=the_token.user))),
+ "description": "Privilege added successfully"
+ })
+
+@groups.route("/role/<uuid:group_role_id>/privilege/delete", methods=["POST"])
+@require_oauth("profile group")
+def delete_priv_from_role(group_role_id: uuid.UUID) -> Response:
+ """Delete privilege from group role."""
+ with require_oauth.acquire("profile group role") as the_token:
+ return jsonify({
+ **dictify(with_db_connection(partial(
+ __add_remove_priv_to_from_role__, group_role_id=group_role_id,
+ direction="DELETE", user=the_token.user))),
+ "description": "Privilege deleted successfully"
+ })
diff --git a/gn3/auth/authorisation/roles/models.py b/gn3/auth/authorisation/roles/models.py
index bb7ea73..be04985 100644
--- a/gn3/auth/authorisation/roles/models.py
+++ b/gn3/auth/authorisation/roles/models.py
@@ -17,7 +17,7 @@ class Role(NamedTuple):
"""Class representing a role: creates immutable objects."""
role_id: UUID
role_name: str
- privileges: Iterable[Privilege]
+ privileges: tuple[Privilege, ...]
def dictify(self) -> dict[str, Any]:
"""Return a dict representation of `Role` objects."""
@@ -44,7 +44,7 @@ def create_role(
RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
"""
- role = Role(uuid4(), role_name, privileges)
+ role = Role(uuid4(), role_name, tuple(privileges))
cursor.execute(
"INSERT INTO roles(role_id, role_name) VALUES (?, ?)",