diff options
Diffstat (limited to 'gn_auth/auth/authorisation/resources/checks.py')
| -rw-r--r-- | gn_auth/auth/authorisation/resources/checks.py | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/gn_auth/auth/authorisation/resources/checks.py b/gn_auth/auth/authorisation/resources/checks.py index 5484dbf..ce2b821 100644 --- a/gn_auth/auth/authorisation/resources/checks.py +++ b/gn_auth/auth/authorisation/resources/checks.py @@ -1,8 +1,11 @@ """Handle authorisation checks for resources""" -from uuid import UUID +import uuid +import warnings from functools import reduce from typing import Sequence +from gn_libs.privileges import check + from .base import Resource from ...db import sqlite3 as db @@ -12,7 +15,7 @@ from ..privileges.models import db_row_to_privilege def __organise_privileges_by_resource_id__(rows): def __organise__(privs, row): - resource_id = UUID(row["resource_id"]) + resource_id = uuid.UUID(row["resource_id"]) return { **privs, resource_id: (row["privilege_id"],) + privs.get( @@ -24,11 +27,14 @@ def __organise_privileges_by_resource_id__(rows): def authorised_for(conn: db.DbConnection, user: User, privileges: tuple[str, ...], - resource_ids: Sequence[UUID]) -> dict[UUID, bool]: + resource_ids: Sequence[uuid.UUID]) -> dict[uuid.UUID, bool]: """ Check whether `user` is authorised to access `resources` according to given `privileges`. """ + warnings.warn(DeprecationWarning( + f"The function `{__name__}.authorised_for` is deprecated. Please use " + f"`{__name__}.authorised_for_spec`")) with db.cursor(conn) as cursor: cursor.execute( ("SELECT ur.*, rp.privilege_id FROM " @@ -61,6 +67,9 @@ def authorised_for2( """ Check that `user` has **ALL** the specified privileges for the resource. """ + warnings.warn(DeprecationWarning( + f"The function `{__name__}.authorised_for2` is deprecated. Please use " + f"`{__name__}.authorised_for_spec`")) with db.cursor(conn) as cursor: _query = ( "SELECT resources.resource_id, user_roles.user_id, roles.role_id, " @@ -82,3 +91,32 @@ def authorised_for2( str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges) return all((requested_privilege in str_privileges) for requested_privilege in privileges) + + +def authorised_for_spec( + conn: db.DbConnection, + user_id: uuid.UUID, + resource_id: uuid.UUID, + auth_spec: str +) -> bool: + """ + Check that a user, identified with `user_id`, has a set of privileges that + satisfy the `auth_spec` for the resource identified with `resource_id`. + """ + with db.cursor(conn) as cursor: + _query = ( + "SELECT resources.resource_id, user_roles.user_id, roles.role_id, " + "privileges.* " + "FROM resources INNER JOIN user_roles " + "ON resources.resource_id=user_roles.resource_id " + "INNER JOIN roles ON user_roles.role_id=roles.role_id " + "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id " + "INNER JOIN privileges " + "ON role_privileges.privilege_id=privileges.privilege_id " + "WHERE resources.resource_id=? " + "AND user_roles.user_id=?") + cursor.execute( + _query, + (str(resource_id), str(user_id))) + _privileges = tuple(row["privilege_id"] for row in cursor.fetchall()) + return check(auth_spec, _privileges) |
