aboutsummaryrefslogtreecommitdiff
"""Handle authorisation checks for resources"""
from uuid import UUID
from functools import reduce
from typing import Sequence

from gn_libs.privileges import check

from .base import Resource

from ...db import sqlite3 as db
from ...authentication.users import User

from ..privileges.models import db_row_to_privilege

def __organise_privileges_by_resource_id__(rows):
    def __organise__(privs, row):
        resource_id = UUID(row["resource_id"])
        return {
            **privs,
            resource_id: (row["privilege_id"],) + privs.get(
                resource_id, tuple())
        }
    return reduce(__organise__, rows, {})


def authorised_for(conn: db.DbConnection,
                   user: User,
                   privileges: tuple[str, ...],
                   resource_ids: Sequence[UUID]) -> dict[UUID, bool]:
    """
    Check whether `user` is authorised to access `resources` according to given
    `privileges`.
    """
    with db.cursor(conn) as cursor:
        cursor.execute(
            ("SELECT ur.*, rp.privilege_id FROM "
             "user_roles AS ur "
             "INNER JOIN roles AS r ON ur.role_id=r.role_id "
             "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
             "WHERE ur.user_id=? "
             f"AND ur.resource_id IN ({', '.join(['?']*len(resource_ids))})"
             f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"),
            ((str(user.user_id),) + tuple(
                str(r_id) for r_id in resource_ids) + tuple(privileges)))
        resource_privileges = __organise_privileges_by_resource_id__(
            cursor.fetchall())
        authorised = tuple(resource_id for resource_id, res_privileges
                           in resource_privileges.items()
                           if all(priv in res_privileges
                                  for priv in privileges))
        return {
            resource_id: resource_id in authorised
            for resource_id in resource_ids
        }


def authorised_for2(
        conn: db.DbConnection,
        user: User,
        resource: Resource,
        privileges: tuple[str, ...]
) -> bool:
    """
    Check that `user` has **ALL** the specified privileges for the resource.
    """
    with db.cursor(conn) as cursor:
        _query = (
            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
            "privileges.* "
            "FROM resources INNER JOIN user_roles "
            "ON resources.resource_id=user_roles.resource_id "
            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
            "INNER JOIN privileges "
            "ON role_privileges.privilege_id=privileges.privilege_id "
            "WHERE resources.resource_id=? "
            "AND user_roles.user_id=?")
        cursor.execute(
            _query,
            (str(resource.resource_id), str(user.user_id)))
        _db_privileges = tuple(
            db_row_to_privilege(row) for row in cursor.fetchall())

    str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
    return all((requested_privilege in str_privileges)
               for requested_privilege in privileges)


def authorised_for_spec(
        conn: db.DbConnection,
        user_id: uuid.UUID,
        resource_id: uuid.UUID,
        auth_spec: str
) -> bool:
    """
    Check that a user, identified with `user_id`, has a set of privileges that
    satisfy the `auth_spec` for the resource identified with `resource_id`.
    """
    with db.cursor(conn) as cursor:
        _query = (
            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
            "privileges.* "
            "FROM resources INNER JOIN user_roles "
            "ON resources.resource_id=user_roles.resource_id "
            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
            "INNER JOIN privileges "
            "ON role_privileges.privilege_id=privileges.privilege_id "
            "WHERE resources.resource_id=? "
            "AND user_roles.user_id=?")
        cursor.execute(
            _query,
            (str(resource.resource_id), str(user.user_id)))
        _privileges = tuple(row["privilege_id"] for row in cursor.fetchall())
    return check(auth_spec, _privileges)