diff options
Diffstat (limited to 'gn_auth/auth/authorisation/roles/models.py')
-rw-r--r-- | gn_auth/auth/authorisation/roles/models.py | 47 |
1 files changed, 41 insertions, 6 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py index e740bfd..6faeaca 100644 --- a/gn_auth/auth/authorisation/roles/models.py +++ b/gn_auth/auth/authorisation/roles/models.py @@ -7,6 +7,7 @@ from typing import Sequence, Iterable, Optional from pymonad.either import Left, Right, Either from gn_auth.auth.errors import NotFoundError, AuthorisationError +from gn_auth.auth.authorisation.resources.base import Resource from ...db import sqlite3 as db from ...authentication.users import User @@ -54,11 +55,14 @@ def db_rows_to_roles(rows) -> tuple[Role, ...]: if bool(rows) else []) @authorised_p( - privileges = ("group:role:create-role",), + privileges = ("resource:role:create-role",), error_description="Could not create role") def create_role( - cursor: db.DbCursor, role_name: str, - privileges: Iterable[Privilege]) -> Role: + cursor: db.DbCursor, + role_name: str, + privileges: Iterable[Privilege], + user_editable: bool=True +) -> Role: """ Create a new generic role. @@ -71,7 +75,7 @@ def create_role( RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object """ - role = Role(uuid4(), role_name, True, tuple(privileges)) + role = Role(uuid4(), role_name, user_editable, tuple(privileges)) cursor.execute( "INSERT INTO roles(role_id, role_name, user_editable) VALUES (?, ?, ?)", @@ -128,6 +132,37 @@ def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]: __organise_privileges__, cursor.fetchall(), {}).values()) return tuple() + +def user_roles_on_resource( + conn: db.DbConnection, + user_id: UUID, + resource_id: UUID +) -> tuple[Role, ...]: + """Retrieve all roles assigned to a user for a particular resource.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT ur.resource_id, ur.user_id, r.*, p.* " + "FROM user_roles AS ur " + "INNER JOIN roles AS r ON ur.role_id=r.role_id " + "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=? AND ur.resource_id=?", + (str(user_id), str(resource_id))) + + return db_rows_to_roles(cursor.fetchall()) + return tuple() + + +def user_resource_roles( + conn: db.DbConnection, + user: User, + resource: Resource +) -> tuple[Role, ...]: + "Retrieve roles a user has on a particular resource." + # TODO: Temporary placeholder to prevent system from breaking. + return user_roles_on_resource(conn, user.user_id, resource.resource_id) + + def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: """Retrieve a specific non-resource role assigned to the user.""" with db.cursor(conn) as cursor: @@ -236,7 +271,7 @@ def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]: _roles = db_rows_to_roles(results) if len(_roles) > 1: - raise Exception("Data corruption: Expected a single role.") + raise Exception("Data corruption: Expected a single role.")# pylint: disable=[broad-exception-raised] return _roles[0] @@ -244,7 +279,7 @@ def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]: def delete_privilege_from_resource_role( cursor: db.DbCursor, role: Role, - privilege_id: str + privilege: Privilege ): """Delete a privilege from a resource role.""" cursor.execute( |