aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/roles/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/roles/models.py')
-rw-r--r--gn_auth/auth/authorisation/roles/models.py47
1 files changed, 41 insertions, 6 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py
index e740bfd..6faeaca 100644
--- a/gn_auth/auth/authorisation/roles/models.py
+++ b/gn_auth/auth/authorisation/roles/models.py
@@ -7,6 +7,7 @@ from typing import Sequence, Iterable, Optional
from pymonad.either import Left, Right, Either
from gn_auth.auth.errors import NotFoundError, AuthorisationError
+from gn_auth.auth.authorisation.resources.base import Resource
from ...db import sqlite3 as db
from ...authentication.users import User
@@ -54,11 +55,14 @@ def db_rows_to_roles(rows) -> tuple[Role, ...]:
if bool(rows) else [])
@authorised_p(
- privileges = ("group:role:create-role",),
+ privileges = ("resource:role:create-role",),
error_description="Could not create role")
def create_role(
- cursor: db.DbCursor, role_name: str,
- privileges: Iterable[Privilege]) -> Role:
+ cursor: db.DbCursor,
+ role_name: str,
+ privileges: Iterable[Privilege],
+ user_editable: bool=True
+) -> Role:
"""
Create a new generic role.
@@ -71,7 +75,7 @@ def create_role(
RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
"""
- role = Role(uuid4(), role_name, True, tuple(privileges))
+ role = Role(uuid4(), role_name, user_editable, tuple(privileges))
cursor.execute(
"INSERT INTO roles(role_id, role_name, user_editable) VALUES (?, ?, ?)",
@@ -128,6 +132,37 @@ def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]:
__organise_privileges__, cursor.fetchall(), {}).values())
return tuple()
+
+def user_roles_on_resource(
+ conn: db.DbConnection,
+ user_id: UUID,
+ resource_id: UUID
+) -> tuple[Role, ...]:
+ """Retrieve all roles assigned to a user for a particular resource."""
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT ur.resource_id, ur.user_id, r.*, p.* "
+ "FROM user_roles AS ur "
+ "INNER JOIN roles AS r ON ur.role_id=r.role_id "
+ "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+ "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+ "WHERE ur.user_id=? AND ur.resource_id=?",
+ (str(user_id), str(resource_id)))
+
+ return db_rows_to_roles(cursor.fetchall())
+ return tuple()
+
+
+def user_resource_roles(
+ conn: db.DbConnection,
+ user: User,
+ resource: Resource
+) -> tuple[Role, ...]:
+ "Retrieve roles a user has on a particular resource."
+ # TODO: Temporary placeholder to prevent system from breaking.
+ return user_roles_on_resource(conn, user.user_id, resource.resource_id)
+
+
def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either:
"""Retrieve a specific non-resource role assigned to the user."""
with db.cursor(conn) as cursor:
@@ -236,7 +271,7 @@ def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]:
_roles = db_rows_to_roles(results)
if len(_roles) > 1:
- raise Exception("Data corruption: Expected a single role.")
+ raise Exception("Data corruption: Expected a single role.")# pylint: disable=[broad-exception-raised]
return _roles[0]
@@ -244,7 +279,7 @@ def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]:
def delete_privilege_from_resource_role(
cursor: db.DbCursor,
role: Role,
- privilege_id: str
+ privilege: Privilege
):
"""Delete a privilege from a resource role."""
cursor.execute(