about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/resources/checks.py
blob: ca45a20263c544ca1b3fce7a8947710e3cfd3ba2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""Handle authorisation checks for resources"""
from uuid import UUID
from functools import reduce
from typing import Sequence

from .base import Resource

from ...db import sqlite3 as db
from ...authentication.users import User

from ..privileges.models import db_row_to_privilege

def __organise_privileges_by_resource_id__(rows):
    def __organise__(privs, row):
        resource_id = UUID(row["resource_id"])
        return {
            **privs,
            resource_id: (row["privilege_id"],) + privs.get(
                resource_id, tuple())
        }
    return reduce(__organise__, rows, {})


def authorised_for(conn: db.DbConnection,
                   user: User,
                   privileges: tuple[str, ...],
                   resource_ids: Sequence[UUID]) -> dict[UUID, bool]:
    """
    Check whether `user` is authorised to access `resources` according to given
    `privileges`.
    """
    with db.cursor(conn) as cursor:
        cursor.execute(
            ("SELECT ur.*, rp.privilege_id FROM "
             "user_roles AS ur "
             "INNER JOIN roles AS r ON ur.role_id=r.role_id "
             "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
             "WHERE ur.user_id=? "
             f"AND ur.resource_id IN ({', '.join(['?']*len(resource_ids))})"
             f"AND rp.privilege_id IN ({', '.join(['?']*len(privileges))})"),
            ((str(user.user_id),) + tuple(
                str(r_id) for r_id in resource_ids) + tuple(privileges)))
        resource_privileges = __organise_privileges_by_resource_id__(
            cursor.fetchall())
        authorised = tuple(resource_id for resource_id, res_privileges
                           in resource_privileges.items()
                           if all(priv in res_privileges
                                  for priv in privileges))
        return {
            resource_id: resource_id in authorised
            for resource_id in resource_ids
        }


def authorised_for2(
        conn: db.DbConnection,
        user: User,
        resource: Resource,
        privileges: tuple[str, ...]
) -> bool:
    """
    Check that `user` has **ALL** the specified privileges for the resource.
    """
    with db.cursor(conn) as cursor:
        _query = (
            "SELECT resources.resource_id, user_roles.user_id, roles.role_id, "
            "privileges.* "
            "FROM resources INNER JOIN user_roles "
            "ON resources.resource_id=user_roles.resource_id "
            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
            "INNER JOIN role_privileges ON roles.role_id=role_privileges.role_id "
            "INNER JOIN privileges "
            "ON role_privileges.privilege_id=privileges.privilege_id "
            f"WHERE resources.resource_id=? "
            "AND user_roles.user_id=?")
        cursor.execute(
            _query,
            (str(resource.resource_id), str(user.user_id)))
        _db_privileges = tuple(
            db_row_to_privilege(row) for row in cursor.fetchall())

    str_privileges = tuple(privilege.privilege_id for privilege in _db_privileges)
    return all((requested_privilege in str_privileges)
               for requested_privilege in privileges)