about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/roles
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/roles')
-rw-r--r--gn_auth/auth/authorisation/roles/models.py47
1 files changed, 41 insertions, 6 deletions
diff --git a/gn_auth/auth/authorisation/roles/models.py b/gn_auth/auth/authorisation/roles/models.py
index e740bfd..6faeaca 100644
--- a/gn_auth/auth/authorisation/roles/models.py
+++ b/gn_auth/auth/authorisation/roles/models.py
@@ -7,6 +7,7 @@ from typing import Sequence, Iterable, Optional
 from pymonad.either import Left, Right, Either
 
 from gn_auth.auth.errors import NotFoundError, AuthorisationError
+from gn_auth.auth.authorisation.resources.base import Resource
 
 from ...db import sqlite3 as db
 from ...authentication.users import User
@@ -54,11 +55,14 @@ def db_rows_to_roles(rows) -> tuple[Role, ...]:
                  if bool(rows) else [])
 
 @authorised_p(
-    privileges = ("group:role:create-role",),
+    privileges = ("resource:role:create-role",),
     error_description="Could not create role")
 def create_role(
-        cursor: db.DbCursor, role_name: str,
-        privileges: Iterable[Privilege]) -> Role:
+        cursor: db.DbCursor,
+        role_name: str,
+        privileges: Iterable[Privilege],
+        user_editable: bool=True
+) -> Role:
     """
     Create a new generic role.
 
@@ -71,7 +75,7 @@ def create_role(
 
     RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
     """
-    role = Role(uuid4(), role_name, True, tuple(privileges))
+    role = Role(uuid4(), role_name, user_editable, tuple(privileges))
 
     cursor.execute(
         "INSERT INTO roles(role_id, role_name, user_editable) VALUES (?, ?, ?)",
@@ -128,6 +132,37 @@ def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]:
             __organise_privileges__, cursor.fetchall(), {}).values())
     return tuple()
 
+
+def user_roles_on_resource(
+        conn: db.DbConnection,
+        user_id: UUID,
+        resource_id: UUID
+) -> tuple[Role, ...]:
+    """Retrieve all roles assigned to a user for a particular resource."""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT ur.resource_id, ur.user_id, r.*, p.* "
+            "FROM user_roles AS ur "
+            "INNER JOIN roles AS r ON ur.role_id=r.role_id "
+            "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
+            "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+            "WHERE ur.user_id=? AND ur.resource_id=?",
+            (str(user_id), str(resource_id)))
+
+        return db_rows_to_roles(cursor.fetchall())
+    return tuple()
+
+
+def user_resource_roles(
+        conn: db.DbConnection,
+        user: User,
+        resource: Resource
+) -> tuple[Role, ...]:
+    "Retrieve roles a user has on a particular resource."
+    # TODO: Temporary placeholder to prevent system from breaking.
+    return user_roles_on_resource(conn, user.user_id, resource.resource_id)
+
+
 def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either:
     """Retrieve a specific non-resource role assigned to the user."""
     with db.cursor(conn) as cursor:
@@ -236,7 +271,7 @@ def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]:
 
     _roles = db_rows_to_roles(results)
     if len(_roles) > 1:
-        raise Exception("Data corruption: Expected a single role.")
+        raise Exception("Data corruption: Expected a single role.")# pylint: disable=[broad-exception-raised]
 
     return _roles[0]
 
@@ -244,7 +279,7 @@ def role_by_id(conn: db.DbConnection, role_id: UUID) -> Optional[Role]:
 def delete_privilege_from_resource_role(
         cursor: db.DbCursor,
         role: Role,
-        privilege_id: str
+        privilege: Privilege
 ):
     """Delete a privilege from a resource role."""
     cursor.execute(