aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn3/auth/authorisation/resources.py43
1 files changed, 39 insertions, 4 deletions
diff --git a/gn3/auth/authorisation/resources.py b/gn3/auth/authorisation/resources.py
index 2c0b72b..0a91930 100644
--- a/gn3/auth/authorisation/resources.py
+++ b/gn3/auth/authorisation/resources.py
@@ -1,13 +1,13 @@
"""Handle the management of resources."""
from uuid import UUID, uuid4
-from typing import Sequence, NamedTuple
+from typing import Dict, Sequence, NamedTuple
from gn3.auth import db
from gn3.auth.authentication.users import User
from .checks import authorised_p
from .exceptions import AuthorisationError
-from .groups import Group, authenticated_user_group
+from .groups import Group, user_group, is_group_leader, authenticated_user_group
class MissingGroupError(AuthorisationError):
"""Raised for any resource operation without a group."""
@@ -73,6 +73,41 @@ def public_resources(conn: db.DbConnection) -> Sequence[Resource]:
bool(row[4]))
for row in results)
-def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]:# pylint: disable=[unused-argument]
- """List the resources available to the user"""
+def group_leader_resources(
+ cursor: db.DbCursor, user: User, group: Group,
+ res_categories: Dict[UUID, ResourceCategory]) -> Sequence[Resource]:
+ """Return all the resources available to the group leader"""
+ if is_group_leader(cursor, user, group):
+ cursor.execute("SELECT * FROM resources WHERE group_id=?",
+ (str(group.group_id),))
+ return tuple(
+ Resource(group, UUID(row[1]), row[2], res_categories[UUID(row[3])],
+ bool(row[4]))
+ for row in cursor.fetchall())
return tuple()
+
+def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]:
+ """List the resources available to the user"""
+ categories = { # Repeated in `public_resources` function
+ cat.resource_category_id: cat for cat in resource_categories(conn)
+ }
+ with db.cursor(conn) as cursor:
+ group = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[misc]
+ if not group:
+ return public_resources(conn)
+
+ gl_resources = group_leader_resources(cursor, user, group, categories)
+
+ cursor.execute(
+ ("SELECT resources.* FROM group_user_roles_on_resources "
+ "LEFT JOIN resources "
+ "ON group_user_roles_on_resources.resource_id=resources.resource_id "
+ "WHERE group_user_roles_on_resources.group_id = ? "
+ "AND group_user_roles_on_resources.user_id = ?"),
+ (str(group.group_id), str(user.user_id)))
+ private_res = tuple(
+ Resource(group, UUID(row[1]), row[2], categories[UUID(row[3])],
+ bool(row[4]))
+ for row in cursor.fetchall())
+
+ return tuple(set(private_res).union(gl_resources).union(public_resources(conn)))