"""Handle management of roles"""
from uuid import UUID, uuid4
from functools import reduce
from dataclasses import dataclass
from typing import Sequence, Iterable
from pymonad.either import Left, Right, Either
from gn_auth.auth.errors import NotFoundError, AuthorisationError
from ...db import sqlite3 as db
from ...authentication.users import User
from ..checks import authorised_p
from ..privileges import Privilege
@dataclass(frozen=True)
class Role:
"""Class representing a role: creates immutable objects."""
role_id: UUID
role_name: str
user_editable: bool
privileges: tuple[Privilege, ...]
def check_user_editable(role: Role):
"""Raise an exception if `role` is not user editable."""
if not role.user_editable:
raise AuthorisationError(
f"The role `{role.role_name}` is not user editable.")
@authorised_p(
privileges = ("group:role:create-role",),
error_description="Could not create role")
def create_role(
cursor: db.DbCursor, role_name: str,
privileges: Iterable[Privilege]) -> Role:
"""
Create a new generic role.
PARAMS:
* cursor: A database cursor object - This function could be used as part of
a transaction, hence the use of a cursor rather than a connection
object.
* role_name: The name of the role
* privileges: A 'list' of privileges to assign the new role
RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
"""
role = Role(uuid4(), role_name, True, tuple(privileges))
cursor.execute(
"INSERT INTO roles(role_id, role_name, user_editable) VALUES (?, ?, ?)",
(str(role.role_id), role.role_name, (1 if role.user_editable else 0)))
cursor.executemany(
"INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)",
tuple((str(role.role_id), str(priv.privilege_id))
for priv in privileges))
return role
def __organise_privileges__(resources, row) -> dict:
resource_id = UUID(row["resource_id"])
role_id = UUID(row["role_id"])
roles = resources.get(resource_id, {}).get("roles", {})
role = roles.get(role_id, Role(
role_id,
row["role_name"],
bool(int(row["user_editable"])),
tuple()))
return {
**resources,
resource_id: {
"resource_id": resource_id,
"user_id": UUID(row["user_id"]),
"roles": {
**roles,
role_id: Role(
role.role_id,
role.role_name,
role.user_editable,
role.privileges + (Privilege(
row["privilege_id"],
row["privilege_description"]),)
)
}
}
}
def user_roles(conn: db.DbConnection, user: User) -> Sequence[dict]:
"""Retrieve all roles (organised by resource) assigned to the user."""
with db.cursor(conn) as cursor:
cursor.execute("SELECT * FROM user_roles")
cursor.execute(
"SELECT ur.resource_id, ur.user_id, r.*, p.* "
"FROM user_roles AS ur "
"INNER JOIN roles AS r ON ur.role_id=r.role_id "
"INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
"INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
"WHERE ur.user_id=?",
(str(user.user_id),))
return tuple({# type: ignore[var-annotated]
**row, "roles": tuple(row["roles"].values())
} for row in reduce(
__organise_privileges__, cursor.fetchall(), {}).values())
return tuple()
def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either:
"""Retrieve a specific non-resource role assigned to the user."""
with db.cursor(conn) as cursor:
cursor.execute(
"SELECT res.resource_id, ur.user_id, r.*, p.* "
"FROM resources AS res INNER JOIN user_roles AS ur "
"ON res.resource_id=ur.resource_id INNER JOIN roles AS r "
"ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
"ON r.role_id=rp.role_id INNER JOIN privileges AS p "
"ON rp.privilege_id=p.privilege_id "
"WHERE ur.user_id=? AND ur.role_id=?",
(str(user.user_id), str(role_id)))
results = cursor.fetchall()
if results:
res_role_obj = tuple(# type: ignore[var-annotated]
reduce(__organise_privileges__, results, {}).values())[0]
resource_id = res_role_obj["resource_id"]
role = tuple(res_role_obj["roles"].values())[0]
return Right((role, resource_id))
return Left(NotFoundError(
f"Could not find role with id '{role_id}'",))
def __assign_group_creator_role__(cursor: db.DbCursor, user: User):
cursor.execute(
'SELECT role_id FROM roles WHERE role_name IN '
'("group-creator")')
role_id = cursor.fetchone()["role_id"]
cursor.execute(
"SELECT resource_id FROM resources AS r "
"INNER JOIN resource_categories AS rc "
"ON r.resource_category_id=rc.resource_category_id "
"WHERE rc.resource_category_key='system'")
resource_id = cursor.fetchone()["resource_id"]
cursor.execute(
("INSERT INTO user_roles VALUES (:user_id, :role_id, :resource_id)"),
{"user_id": str(user.user_id), "role_id": role_id,
"resource_id": resource_id})
def __assign_public_view_role__(cursor: db.DbCursor, user: User):
cursor.execute("SELECT resource_id FROM resources WHERE public=1")
public_resources = tuple(row["resource_id"] for row in cursor.fetchall())
cursor.execute("SELECT role_id FROM roles WHERE role_name='public-view'")
role_id = cursor.fetchone()["role_id"]
cursor.executemany(
"INSERT INTO user_roles(user_id, role_id, resource_id) "
"VALUES(:user_id, :role_id, :resource_id)",
tuple({
"user_id": str(user.user_id),
"role_id": role_id,
"resource_id": resource_id
} for resource_id in public_resources))
def assign_default_roles(cursor: db.DbCursor, user: User):
"""Assign `user` some default roles."""
__assign_group_creator_role__(cursor, user)
__assign_public_view_role__(cursor, user)
def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str):
"""Revoke a role from `user` by the role's name"""
# TODO: Pass in the resource_id - this works somewhat correctly, but it's
# only because it is used in for revoking the "group-creator" role so
# far
cursor.execute(
"SELECT role_id FROM roles WHERE role_name=:role_name",
{"role_name": role_name})
role = cursor.fetchone()
if role:
cursor.execute(
("DELETE FROM user_roles "
"WHERE user_id=:user_id AND role_id=:role_id"),
{"user_id": str(user.user_id), "role_id": role["role_id"]})
def assign_user_role_by_name(
cursor: db.DbCursor, user: User, resource_id: UUID, role_name: str):
"""Revoke a role from `user` by the role's name"""
cursor.execute(
"SELECT role_id FROM roles WHERE role_name=:role_name",
{"role_name": role_name})
role = cursor.fetchone()
if role:
cursor.execute(
("INSERT INTO user_roles VALUES(:user_id, :role_id, :resource_id) "
"ON CONFLICT DO NOTHING"),
{
"user_id": str(user.user_id),
"role_id": role["role_id"],
"resource_id": str(resource_id)
})