about summary refs log tree commit diff
path: root/migrations/auth/20250729_03_oCvvq-grant-role-to-all-resources-to-sys-admin-users.py
blob: e3bdc8f3980f4981758dec4829433c9bd793df51 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""
Grant  role to ALL resources to sys-admin users.
"""
import itertools
import contextlib

from yoyo import step

__depends__ = {'20250729_02_7ycSm-assign-initial-system-wide-resources-access-privileges-to-sys-admins'}


def system_administrator_role_id(cursor):
    """Fetch ID for role 'system-administrator'."""
    cursor.execute(
        "SELECT role_id FROM roles WHERE role_name='system-administrator'")
    return cursor.fetchone()[0]


def system_resource_id(cursor):
    cursor.execute(
        "SELECT resources.resource_id FROM resource_categories "
        "INNER JOIN resources ON resource_categories.resource_category_id=resources.resource_category_id "
        "WHERE resource_category_key = 'system'")
    return cursor.fetchone()[0]


def fetch_ids_for_sysadmin_users(cursor):
    """Fetch all sysadmin users' IDs."""
    cursor.execute(
        "SELECT user_roles.user_id FROM roles INNER JOIN user_roles "
        "ON roles.role_id=user_roles.role_id "
        "WHERE role_name='system-administrator' AND resource_id=?",
        (system_resource_id(cursor),))
    return tuple(row[0] for row in cursor.fetchall())


def fetch_non_system_resources(cursor):
    """Fetch IDs for all resources that are not of the 'system' category."""
    cursor.execute(
        "SELECT resources.resource_id FROM resource_categories "
        "INNER JOIN resources "
        "ON resource_categories.resource_category_id=resources.resource_category_id "
        "WHERE resource_category_key != 'system'")
    return tuple(row[0] for row in cursor.fetchall())


def assign_sysadmin_role_on_non_system_resources(conn):
    """Assign sysadmins the sysadmin role on all non-system resources."""
    with contextlib.closing(conn.cursor()) as cursor:
        sysadminroleid = system_administrator_role_id(cursor)
        cursor.executemany(
            "INSERT INTO user_roles(user_id, resource_id, role_id) "
            "VALUES (?, ?, ?)",
            tuple(item + (sysadminroleid,)
                  for item in itertools.product(
                          fetch_ids_for_sysadmin_users(cursor),
                          fetch_non_system_resources(cursor))))


def revoke_sysadmin_role_on_non_system_resources(conn):
    """Revoke sysadmins the sysadmin role on all non-system resources."""
    with contextlib.closing(conn.cursor()) as cursor:
        sysadminroleid = system_administrator_role_id(cursor)
        cursor.executemany(
            "DELETE FROM user_roles "
            "WHERE user_id=? AND resource_id=? AND role_id=?",
            tuple(item + (sysadminroleid,)
                  for item in itertools.product(
                          fetch_ids_for_sysadmin_users(cursor),
                          fetch_non_system_resources(cursor))))

steps = [
    step(assign_sysadmin_role_on_non_system_resources,
         revoke_sysadmin_role_on_non_system_resources)
]