"""Handle authorisation checks for resources""" from uuid import UUID from functools import reduce from typing import Sequence from gn_libs.privileges import check from .base import Resource from ...db import sqlite3 as db from ...authentication.users import User from ..privileges.models import db_row_to_privilege def __organise_privileges_by_resource_id__(rows): def __organise__(privs, row): resource_id = UUID(row["resource_id"]) return { **privs, resource_id: (row["privilege_id"],) + privs.get( resource_id, tuple()) } return reduce(__organise__, rows, {}) def authorised_for(conn: db.DbConnection, user: User, privileges: tuple[str, ...], resource_ids: Sequence[UUID]) -> dict[UUID, bool]: """ Check whether `user` is authorised to access `resources` according to given `privileges`. """ with db.cursor(conn) as cursor: cursor.execute( ("SELECT ur.*, rp.privilege_id FROM " "user_roles AS ur " "INNER JOIN roles AS r ON ur.role_id=r.role_id " "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " "WHERE ur.user_id=? " f"AND ur.resource_id IN ({', '.join(['?']*len(resource_ids))})" f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"), ((str(user.user_id),) + tuple( str(r_id) for r_id in resource_ids) + tuple(privileges))) resource_privileges = __organise_privileges_by_resource_id__( cursor.fetchall()) authorised = tuple(resource_id for resource_id, res_privileges in resource_privileges.items() if all(priv in res_privileges for priv in privileges)) return { resource_id: resource_id in authorised for resource_id in resource_ids } def authorised_for2( conn: db.DbConnection, user: User, resource: Resource, privileges: tuple[str, ...] ) -> bool: """ Check that `user` has **ALL** the specified privileges for the resource. """ with db.cursor(conn) as cursor: _query = ( "SELECT resources.resource_id, user_roles.user_id, roles.role_id, " "privileges.* " "FROM resources INNER JOIN user_roles " "ON resources.resource_id=user_roles.resource_id " "INNER JOIN roles ON user_roles.role_id=roles.role_id " "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id " "INNER JOIN privileges " "ON role_privileges.privilege_id=privileges.privilege_id " "WHERE resources.resource_id=? " "AND user_roles.user_id=?") cursor.execute( _query, (str(resource.resource_id), str(user.user_id))) _db_privileges = tuple( db_row_to_privilege(row) for row in cursor.fetchall()) str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges) return all((requested_privilege in str_privileges) for requested_privilege in privileges) def authorised_for_spec( conn: db.DbConnection, user_id: uuid.UUID, resource_id: uuid.UUID, auth_spec: str ) -> bool: """ Check that a user, identified with `user_id`, has a set of privileges that satisfy the `auth_spec` for the resource identified with `resource_id`. """ with db.cursor(conn) as cursor: _query = ( "SELECT resources.resource_id, user_roles.user_id, roles.role_id, " "privileges.* " "FROM resources INNER JOIN user_roles " "ON resources.resource_id=user_roles.resource_id " "INNER JOIN roles ON user_roles.role_id=roles.role_id " "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id " "INNER JOIN privileges " "ON role_privileges.privilege_id=privileges.privilege_id " "WHERE resources.resource_id=? " "AND user_roles.user_id=?") cursor.execute( _query, (str(resource.resource_id), str(user.user_id))) _privileges = tuple(row["privilege_id"] for row in cursor.fetchall()) return check(auth_spec, _privileges)