aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/resources
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/resources')
-rw-r--r--gn_auth/auth/authorisation/resources/groups/models.py100
1 files changed, 67 insertions, 33 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/models.py b/gn_auth/auth/authorisation/resources/groups/models.py
index 3afa7bf..fc10972 100644
--- a/gn_auth/auth/authorisation/resources/groups/models.py
+++ b/gn_auth/auth/authorisation/resources/groups/models.py
@@ -3,7 +3,7 @@ import json
from uuid import UUID, uuid4
from functools import reduce
from dataclasses import dataclass, asdict
-from typing import Any, Sequence, Iterable, Optional, NamedTuple
+from typing import Any, Sequence, Iterable, Optional
from flask import g
from pymonad.maybe import Just, Maybe, Nothing
@@ -46,9 +46,11 @@ class GroupRole:
role: Role
+
class GroupCreationError(AuthorisationError):
"""Raised whenever a group creation fails"""
+
class MembershipError(AuthorisationError):
"""Raised when there is an error with a user's membership to a group."""
@@ -60,6 +62,7 @@ class MembershipError(AuthorisationError):
f"groups ({groups_str})")
super().__init__(f"{type(self).__name__}: {error_description}.")
+
def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]:
"""Returns all the groups that a member belongs to"""
query = (
@@ -74,18 +77,19 @@ def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]:
return groups
+
@authorised_p(
- privileges = ("system:group:create-group",),
- error_description = (
+ privileges=("system:group:create-group",),
+ error_description=(
"You do not have the appropriate privileges to enable you to "
"create a new group."),
- oauth2_scope = "profile group")
+ oauth2_scope="profile group")
def create_group(
conn: db.DbConnection, group_name: str, group_leader: User,
group_description: Optional[str] = None) -> Group:
"""Create a new group."""
def resource_category_by_key(
- cursor: db.DbCursor, category_key: str):
+ cursor: db.DbCursor, category_key: str):
"""Retrieve a resource category by its key."""
cursor.execute(
"SELECT * FROM resource_categories WHERE "
@@ -103,7 +107,7 @@ def create_group(
with db.cursor(conn) as cursor:
new_group = save_group(
- cursor, group_name,(
+ cursor, group_name, (
{"group_description": group_description}
if group_description else {}))
group_resource = {
@@ -111,7 +115,9 @@ def create_group(
"resource_id": str(uuid4()),
"resource_name": group_name,
"resource_category_id": str(
- resource_category_by_key(cursor, "group")["resource_category_id"]),
+ resource_category_by_key(
+ cursor, "group")["resource_category_id"]
+ ),
"public": 0
}
cursor.execute(
@@ -131,6 +137,7 @@ def create_group(
"group-leader")
return new_group
+
@authorised_p(("group:role:create-role",),
error_description="Could not create the group role")
def create_group_role(
@@ -147,6 +154,7 @@ def create_group_role(
return GroupRole(group_role_id, group, role)
+
def authenticated_user_group(conn) -> Maybe:
"""
Returns the currently authenticated user's group.
@@ -171,12 +179,13 @@ def authenticated_user_group(conn) -> Maybe:
return Nothing
+
def user_group(conn: db.DbConnection, user: User) -> Maybe[Group]:
"""Returns the given user's group"""
with db.cursor(conn) as cursor:
cursor.execute(
- ("SELECT groups.group_id, groups.group_name, groups.group_metadata "
- "FROM group_users "
+ ("SELECT groups.group_id, groups.group_name, "
+ "groups.group_metadata FROM group_users "
"INNER JOIN groups ON group_users.group_id=groups.group_id "
"WHERE group_users.user_id = ?"),
(str(user.user_id),))
@@ -192,11 +201,12 @@ def user_group(conn: db.DbConnection, user: User) -> Maybe[Group]:
return Nothing
+
def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool:
"""Check whether the given `user` is the leader of `group`."""
ugroup = user_group(conn, user).maybe(
- False, lambda val: val) # type: ignore[arg-type, misc]
+ False, lambda val: val) # type: ignore[arg-type, misc]
if not group:
# User cannot be a group leader if not a member of ANY group
return False
@@ -214,6 +224,7 @@ def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool:
return "group-leader" in role_names
+
def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]:
"""Retrieve all existing groups"""
with db.cursor(conn) as cursor:
@@ -226,6 +237,7 @@ def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]:
return Nothing
+
def save_group(
cursor: db.DbCursor, group_name: str,
group_metadata: dict[str, Any]) -> Group:
@@ -236,10 +248,12 @@ def save_group(
"VALUES(:group_id, :group_name, :group_metadata) "
"ON CONFLICT (group_id) DO UPDATE SET "
"group_name=:group_name, group_metadata=:group_metadata"),
- {"group_id": str(the_group.group_id), "group_name": the_group.group_name,
+ {"group_id": str(the_group.group_id),
+ "group_name": the_group.group_name,
"group_metadata": json.dumps(the_group.group_metadata)})
return the_group
+
def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User):
"""Add `user` to `the_group` as a member."""
cursor.execute(
@@ -247,11 +261,12 @@ def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User):
"ON CONFLICT (group_id, user_id) DO NOTHING"),
{"group_id": str(the_group.group_id), "user_id": str(user.user_id)})
+
@authorised_p(
- privileges = ("system:group:view-group",),
- error_description = (
- "You do not have the appropriate privileges to access the list of users"
- " in the group."))
+ privileges=("system:group:view-group",),
+ error_description=(
+ "You do not have the appropriate privileges to "
+ "access the list of users in the group."))
def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]:
"""Retrieve all users that are members of group with id `group_id`."""
with db.cursor(conn) as cursor:
@@ -264,9 +279,10 @@ def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]:
return (User(UUID(row["user_id"]), row["email"], row["name"])
for row in results)
+
@authorised_p(
- privileges = ("system:group:view-group",),
- error_description = (
+ privileges=("system:group:view-group",),
+ error_description=(
"You do not have the appropriate privileges to access the group."))
def group_by_id(conn: db.DbConnection, group_id: UUID) -> Group:
"""Retrieve a group by its ID"""
@@ -282,19 +298,24 @@ def group_by_id(conn: db.DbConnection, group_id: UUID) -> Group:
raise NotFoundError(f"Could not find group with ID '{group_id}'.")
+
@authorised_p(("system:group:view-group", "system:group:edit-group"),
- error_description=("You do not have the appropriate authorisation"
- " to act upon the join requests."),
+ error_description=(
+ "You do not have the appropriate authorisation"
+ " to act upon the join requests."
+ ),
oauth2_scope="profile group")
def join_requests(conn: db.DbConnection, user: User):
"""List all the join requests for the user's group."""
with db.cursor(conn) as cursor:
- group = user_group(conn, user).maybe(DUMMY_GROUP, lambda grp: grp)# type: ignore[misc]
+ group = user_group(conn, user).maybe(
+ DUMMY_GROUP, lambda grp: grp) # type: ignore[misc]
if group != DUMMY_GROUP and is_group_leader(conn, user, group):
cursor.execute(
- "SELECT gjr.*, u.email, u.name FROM group_join_requests AS gjr "
- "INNER JOIN users AS u ON gjr.requester_id=u.user_id "
- "WHERE gjr.group_id=? AND gjr.status='PENDING'",
+ "SELECT gjr.*, u.email, u.name FROM \
+group_join_requests AS gjr INNER JOIN users AS u ON \
+gjr.requester_id=u.user_id WHERE gjr.group_id=? AND \
+gjr.status='PENDING'",
(str(group.group_id),))
return tuple(dict(row)for row in cursor.fetchall())
@@ -302,16 +323,21 @@ def join_requests(conn: db.DbConnection, user: User):
"You do not have the appropriate authorisation to access the "
"group's join requests.")
+
@authorised_p(("system:group:view-group", "system:group:edit-group"),
- error_description=("You do not have the appropriate authorisation"
- " to act upon the join requests."),
+ error_description=(
+ "You do not have the appropriate authorisation"
+ " to act upon the join requests."
+ ),
oauth2_scope="profile group")
def accept_reject_join_request(
- conn: db.DbConnection, request_id: UUID, user: User, status: str) -> dict:
+ conn: db.DbConnection, request_id: UUID, user: User, status: str
+) -> dict:
"""Accept/Reject a join request."""
assert status in ("ACCEPTED", "REJECTED"), f"Invalid status '{status}'."
with db.cursor(conn) as cursor:
- group = user_group(conn, user).maybe(DUMMY_GROUP, lambda grp: grp) # type: ignore[misc]
+ group = user_group(conn, user).maybe(
+ DUMMY_GROUP, lambda grp: grp) # type: ignore[misc]
cursor.execute("SELECT * FROM group_join_requests WHERE request_id=?",
(str(request_id),))
row = cursor.fetchone()
@@ -321,7 +347,8 @@ def accept_reject_join_request(
the_user = user_by_id(conn, UUID(row["requester_id"]))
if status == "ACCEPTED":
add_user_to_group(cursor, group, the_user)
- revoke_user_role_by_name(cursor, the_user, "group-creator")
+ revoke_user_role_by_name(
+ cursor, the_user, "group-creator")
cursor.execute(
"UPDATE group_join_requests SET status=? "
"WHERE request_id=?",
@@ -335,6 +362,7 @@ def accept_reject_join_request(
"You cannot act on other groups join requests")
raise NotFoundError(f"Could not find request with ID '{request_id}'")
+
def __organise_privileges__(acc, row):
role_id = UUID(row["role_id"])
role = acc.get(role_id, False)
@@ -359,11 +387,13 @@ def __organise_privileges__(acc, row):
# @authorised_p(("group:role:view",),
# "Insufficient privileges to view role",
# oauth2_scope="profile group role")
+
+
def group_role_by_id(
conn: db.DbConnection, group: Group, group_role_id: UUID) -> GroupRole:
"""Retrieve GroupRole from id by its `group_role_id`."""
- ## TODO: do privileges check before running actual query
- ## the check commented out above doesn't work correctly
+ # TODO: do privileges check before running actual query
+ # the check commented out above doesn't work correctly
with db.cursor(conn) as cursor:
cursor.execute(
"SELECT gr.group_role_id, r.*, p.* "
@@ -375,20 +405,21 @@ def group_role_by_id(
(str(group_role_id), str(group.group_id)))
rows = cursor.fetchall()
if rows:
- roles: tuple[Role,...] = tuple(reduce(
+ roles: tuple[Role, ...] = tuple(reduce(
__organise_privileges__, rows, {}).values())
assert len(roles) == 1
return GroupRole(group_role_id, group, roles[0])
raise NotFoundError(
f"Group role with ID '{group_role_id}' does not exist.")
+
@authorised_p(("group:role:edit-role",),
"You do not have the privilege to edit a role.",
oauth2_scope="profile group role")
def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole,
privilege: Privilege) -> GroupRole:
"""Add `privilege` to `group_role`."""
- ## TODO: do privileges check.
+ # TODO: do privileges check.
check_user_editable(group_role.role)
with db.cursor(conn) as cursor:
cursor.execute(
@@ -404,6 +435,7 @@ def add_privilege_to_group_role(conn: db.DbConnection, group_role: GroupRole,
group_role.role.user_editable,
group_role.role.privileges + (privilege,)))
+
@authorised_p(("group:role:edit-role",),
"You do not have the privilege to edit a role.",
oauth2_scope="profile group role")
@@ -411,7 +443,7 @@ def delete_privilege_from_group_role(
conn: db.DbConnection, group_role: GroupRole,
privilege: Privilege) -> GroupRole:
"""Delete `privilege` to `group_role`."""
- ## TODO: do privileges check.
+ # TODO: do privileges check.
check_user_editable(group_role.role)
with db.cursor(conn) as cursor:
cursor.execute(
@@ -427,6 +459,7 @@ def delete_privilege_from_group_role(
tuple(priv for priv in group_role.role.privileges
if priv != privilege)))
+
def resource_owner(conn: db.DbConnection, resource: Resource) -> Group:
"""Return the user group that owns the resource."""
with db.cursor(conn) as cursor:
@@ -444,6 +477,7 @@ def resource_owner(conn: db.DbConnection, resource: Resource) -> Group:
raise MissingGroupError("Resource has no 'owning' group.")
+
def add_resources_to_group(conn: db.DbConnection,
resources: tuple[Resource, ...],
group: Group):