aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/resources/groups/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/resources/groups/models.py')
-rw-r--r--gn_auth/auth/authorisation/resources/groups/models.py106
1 files changed, 96 insertions, 10 deletions
diff --git a/gn_auth/auth/authorisation/resources/groups/models.py b/gn_auth/auth/authorisation/resources/groups/models.py
index 3263e37..1d44ca4 100644
--- a/gn_auth/auth/authorisation/resources/groups/models.py
+++ b/gn_auth/auth/authorisation/resources/groups/models.py
@@ -8,14 +8,18 @@ from typing import Any, Sequence, Iterable, Optional
import sqlite3
from flask import g
from pymonad.maybe import Just, Maybe, Nothing
+from pymonad.either import Left, Right, Either
+from pymonad.tools import monad_from_none_or_value
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.authentication.users import User, user_by_id
from gn_auth.auth.authorisation.checks import authorised_p
from gn_auth.auth.authorisation.privileges import Privilege
-from gn_auth.auth.authorisation.resources.base import Resource
from gn_auth.auth.authorisation.resources.errors import MissingGroupError
+from gn_auth.auth.authorisation.resources.base import (
+ Resource,
+ resource_from_dbrow)
from gn_auth.auth.errors import (
NotFoundError, AuthorisationError, InconsistencyError)
from gn_auth.auth.authorisation.roles.models import (
@@ -118,7 +122,7 @@ def create_group(
cursor, group_name, (
{"group_description": group_description}
if group_description else {}))
- group_resource = {
+ _group_resource = {
"group_id": str(new_group.group_id),
"resource_id": str(uuid4()),
"resource_name": group_name,
@@ -131,17 +135,17 @@ def create_group(
cursor.execute(
"INSERT INTO resources VALUES "
"(:resource_id, :resource_name, :resource_category_id, :public)",
- group_resource)
+ _group_resource)
cursor.execute(
"INSERT INTO group_resources(resource_id, group_id) "
"VALUES(:resource_id, :group_id)",
- group_resource)
+ _group_resource)
add_user_to_group(cursor, new_group, group_leader)
revoke_user_role_by_name(cursor, group_leader, "group-creator")
assign_user_role_by_name(
cursor,
group_leader,
- UUID(str(group_resource["resource_id"])),
+ UUID(str(_group_resource["resource_id"])),
"group-leader")
return new_group
@@ -233,15 +237,56 @@ def is_group_leader(conn: db.DbConnection, user: User, group: Group) -> bool:
return "group-leader" in role_names
-def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]:
+def __build_groups_list_query__(
+ base: str,
+ search: Optional[str] = None
+) -> tuple[str, tuple[Optional[str], ...]]:
+ """Build up the query from given search terms."""
+ if search is not None and search.strip() != "":
+ _search = search.strip()
+ return ((f"{base} WHERE groups.group_name LIKE ? "
+ "OR groups.group_metadata LIKE ?"),
+ (f"%{search}%", f"%{search}%"))
+ return base, tuple()
+
+
+def __limit_results_length__(base: str, start: int = 0, length: int = 0) -> str:
+ """Add the `LIMIT … OFFSET …` clause to query `base`."""
+ if length > 0:
+ return f"{base} LIMIT {length} OFFSET {start}"
+ return base
+
+
+def all_groups(
+ conn: db.DbConnection,
+ search: Optional[str] = None,
+ start: int = 0,
+ length: int = 0
+) -> Maybe[tuple[tuple[Group, ...], int, int]]:
"""Retrieve all existing groups"""
with db.cursor(conn) as cursor:
- cursor.execute("SELECT * FROM groups")
+ cursor.execute("SELECT COUNT(*) FROM groups")
+ _groups_total_count = int(cursor.fetchone()["COUNT(*)"])
+
+ _qdets = __build_groups_list_query__(
+ "SELECT COUNT(*) FROM groups", search)
+ cursor.execute(*__build_groups_list_query__(
+ "SELECT COUNT(*) FROM groups", search))
+ _filtered_total_count = int(cursor.fetchone()["COUNT(*)"])
+
+ _query, _params = __build_groups_list_query__(
+ "SELECT * FROM groups", search)
+
+ cursor.execute(__limit_results_length__(_query, start, length),
+ _params)
res = cursor.fetchall()
if res:
- return Just(tuple(
- Group(row["group_id"], row["group_name"],
- json.loads(row["group_metadata"])) for row in res))
+ return Just((
+ tuple(
+ Group(row["group_id"], row["group_name"],
+ json.loads(row["group_metadata"])) for row in res),
+ _groups_total_count,
+ _filtered_total_count))
return Nothing
@@ -497,3 +542,44 @@ def add_resources_to_group(conn: db.DbConnection,
"group_id": str(group.group_id),
"resource_id": str(rsc.resource_id)
} for rsc in resources))
+
+
+def admin_group(conn: db.DbConnection) -> Either:
+ """Return a group where at least one system admin is a member."""
+ query = (
+ "SELECT DISTINCT g.group_id, g.group_name, g.group_metadata "
+ "FROM roles AS r INNER JOIN user_roles AS ur ON r.role_id=ur.role_id "
+ "INNER JOIN group_users AS gu ON ur.user_id=gu.user_id "
+ "INNER JOIN groups AS g ON gu.group_id=g.group_id "
+ "WHERE role_name='system-administrator'")
+ with db.cursor(conn) as cursor:
+ cursor.execute(query)
+ return monad_from_none_or_value(
+ Left("There is no group of which the system admininstrator is a "
+ "member."),
+ lambda row: Right(Group(
+ UUID(row["group_id"]),
+ row["group_name"],
+ json.loads(row["group_metadata"]))),
+ cursor.fetchone())
+
+
+def group_resource(conn: db.DbConnection, group_id: UUID) -> Resource:
+ """Retrieve the system resource."""
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT group_resources.group_id, resource_categories.*, "
+ "resources.resource_id, resources.resource_name, resources.public "
+ "FROM group_resources INNER JOIN resources "
+ "ON group_resources.resource_id=resources.resource_id "
+ "INNER JOIN resource_categories "
+ "ON resources.resource_category_id=resource_categories.resource_category_id "
+ "WHERE group_resources.group_id=? "
+ "AND resource_categories.resource_category_key='group'",
+ (str(group_id),))
+ row = cursor.fetchone()
+ if row:
+ return resource_from_dbrow(row)
+
+ raise NotFoundError("Could not find a resource for group with ID "
+ f"{group_id}")