about summary refs log tree commit diff
"""Handle authorisation checks for resources"""
from uuid import UUID
from functools import reduce
from typing import Sequence

from gn3.auth import db
from gn3.auth.authentication.users import User

def __organise_privileges_by_resource_id__(rows):
    def __organise__(privs, row):
        resource_id = UUID(row["resource_id"])
        return {
            **privs,
            resource_id: (row["privilege_id"],) + privs.get(
                resource_id, tuple())
        }
    return reduce(__organise__, rows, {})

def authorised_for(conn: db.DbConnection, user: User, privileges: tuple[str],
                   resource_ids: Sequence[UUID]) -> dict[UUID, bool]:
    """
    Check whether `user` is authorised to access `resources` according to given
    `privileges`.
    """
    with db.cursor(conn) as cursor:
        cursor.execute(
            ("SELECT guror.*, rp.privilege_id FROM "
             "group_user_roles_on_resources AS guror "
             "INNER JOIN group_roles AS gr ON  "
             "(guror.group_id=gr.group_id AND guror.role_id=gr.role_id) "
             "INNER JOIN roles AS r ON gr.role_id=r.role_id "
             "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
             "WHERE guror.user_id=? "
             f"AND guror.resource_id IN ({', '.join(['?']*len(resource_ids))})"
             f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"),
            ((str(user.user_id),) + tuple(
                str(r_id) for r_id in resource_ids) + tuple(privileges)))
        resource_privileges = __organise_privileges_by_resource_id__(
            cursor.fetchall())
        authorised = tuple(resource_id for resource_id, res_privileges
                           in resource_privileges.items()
                           if all(priv in res_privileges
                                  for priv in privileges))
        return {
            resource_id: resource_id in authorised
            for resource_id in resource_ids
        }