aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/resources/models.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-02-06 14:20:24 +0300
committerFrederick Muriuki Muriithi2023-02-06 14:20:24 +0300
commit30da2f48eb35360bb339d54da2ab83d96a1cf85b (patch)
treee30f2e0c3e884df0be52c7c3ffe65d1636aaa2c1 /gn3/auth/authorisation/resources/models.py
parent6c76667857d5bbc8db962a551cece3f068074055 (diff)
downloadgenenetwork3-30da2f48eb35360bb339d54da2ab83d96a1cf85b.tar.gz
auth: resource: Enable viewing the details of a resource.
Diffstat (limited to 'gn3/auth/authorisation/resources/models.py')
-rw-r--r--gn3/auth/authorisation/resources/models.py72
1 files changed, 62 insertions, 10 deletions
diff --git a/gn3/auth/authorisation/resources/models.py b/gn3/auth/authorisation/resources/models.py
index df7fdf9..368ac1b 100644
--- a/gn3/auth/authorisation/resources/models.py
+++ b/gn3/auth/authorisation/resources/models.py
@@ -3,15 +3,15 @@ import json
from uuid import UUID, uuid4
from typing import Any, Dict, Sequence, NamedTuple
-from pymonad.maybe import Just, Maybe, Nothing
-
from gn3.auth import db
from gn3.auth.dictify import dictify
from gn3.auth.authentication.users import User
+from .checks import authorised_for
+
from ..checks import authorised_p
-from ..errors import AuthorisationError
-from ..groups.models import Group, user_group, is_group_leader
+from ..errors import NotFoundError, AuthorisationError
+from ..groups.models import Group, user_group, group_by_id, is_group_leader
class MissingGroupError(AuthorisationError):
"""Raised for any resource operation without a group."""
@@ -47,6 +47,32 @@ class Resource(NamedTuple):
"public": self.public
}
+def __assign_resource_owner_role__(cursor, resource, user):
+ """Assign `user` the 'Resource Owner' role for `resource`."""
+ cursor.execute(
+ "SELECT gr.* FROM group_roles AS gr INNER JOIN roles AS r "
+ "ON gr.role_id=r.role_id WHERE r.role_name='resource-owner'")
+ role = cursor.fetchone()
+ if not role:
+ cursor.execute("SELECT * FROM roles WHERE role_name='resource-owner'")
+ role = cursor.fetchone()
+ cursor.execute(
+ "INSERT INTO group_roles VALUES "
+ "(:group_role_id, :group_id, :role_id)",
+ {"group_role_id": str(uuid4()),
+ "group_id": str(resource.group.group_id),
+ "role_id": role["role_id"]})
+
+ cursor.execute(
+ "INSERT INTO group_user_roles_on_resources "
+ "VALUES ("
+ ":group_id, :user_id, :role_id, :resource_id"
+ ")",
+ {"group_id": str(resource.group.group_id),
+ "user_id": str(user.user_id),
+ "role_id": role["role_id"],
+ "resource_id": str(resource.resource_id)})
+
@authorised_p(("group:resource:create-resource",),
error_description="Insufficient privileges to create a resource",
oauth2_scope="profile resource")
@@ -67,12 +93,12 @@ def create_resource(
resource_name,
str(resource.resource_category.resource_category_id),
1 if resource.public else 0))
- # assign_resource_owner_role(conn, resource, user)
+ __assign_resource_owner_role__(cursor, resource, user)
return resource
def resource_category_by_id(
- conn: db.DbConnection, category_id: UUID) -> Maybe[ResourceCategory]:
+ conn: db.DbConnection, category_id: UUID) -> ResourceCategory:
"""Retrieve a resource category by its ID."""
with db.cursor(conn) as cursor:
cursor.execute(
@@ -81,12 +107,13 @@ def resource_category_by_id(
(str(category_id),))
results = cursor.fetchone()
if results:
- return Just(ResourceCategory(
+ return ResourceCategory(
UUID(results["resource_category_id"]),
results["resource_category_key"],
- results["resource_category_description"]))
+ results["resource_category_description"])
- return Nothing
+ raise NotFoundError(
+ f"Could not find a ResourceCategory with ID '{category_id}'")
def resource_categories(conn: db.DbConnection) -> Sequence[ResourceCategory]:
"""Retrieve all available resource categories"""
@@ -148,10 +175,11 @@ def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]:
"WHERE group_user_roles_on_resources.group_id = ? "
"AND group_user_roles_on_resources.user_id = ?"),
(str(group.group_id), str(user.user_id)))
+ rows = cursor.fetchall()
private_res = tuple(
Resource(group, UUID(row[1]), row[2], categories[UUID(row[3])],
bool(row[4]))
- for row in cursor.fetchall())
+ for row in rows)
return tuple({
res.resource_id: res
for res in
@@ -161,3 +189,27 @@ def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]:
# Fix the typing here
return user_group(cursor, user).map(__all_resources__).maybe(# type: ignore[arg-type,misc]
public_resources(conn), lambda res: res)# type: ignore[arg-type,return-value]
+
+def resource_by_id(
+ conn: db.DbConnection, user: User, resource_id: UUID) -> Resource:
+ """Retrieve a resource by its ID."""
+ if not authorised_for(
+ conn, user, ("group:resource:view-resource",),
+ (resource_id,))[resource_id]:
+ raise AuthorisationError(
+ "You are not authorised to access resource with id "
+ f"'{resource_id}'.")
+
+ with db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM resources WHERE resource_id=:id",
+ {"id": str(resource_id)})
+ row = cursor.fetchone()
+ if row:
+ return Resource(
+ group_by_id(conn, UUID(row["group_id"])),
+ UUID(row["resource_id"]),
+ row["resource_name"],
+ resource_category_by_id(conn, row["resource_category_id"]),
+ bool(int(row["public"])))
+
+ raise NotFoundError(f"Could not find a resource with id '{resource_id}'")