""" Grant role to ALL resources to sys-admin users. """ import itertools import contextlib from yoyo import step __depends__ = {'20250729_02_7ycSm-assign-initial-system-wide-resources-access-privileges-to-sys-admins'} def system_administrator_role_id(cursor): """Fetch ID for role 'system-administrator'.""" cursor.execute( "SELECT role_id FROM roles WHERE role_name='system-administrator'") return cursor.fetchone()[0] def system_resource_id(cursor): cursor.execute( "SELECT resources.resource_id FROM resource_categories " "INNER JOIN resources ON resource_categories.resource_category_id=resources.resource_category_id " "WHERE resource_category_key = 'system'") return cursor.fetchone()[0] def fetch_ids_for_sysadmin_users(cursor): """Fetch all sysadmin users' IDs.""" cursor.execute( "SELECT user_roles.user_id FROM roles INNER JOIN user_roles " "ON roles.role_id=user_roles.role_id " "WHERE role_name='system-administrator' AND resource_id=?", (system_resource_id(cursor),)) return tuple(row[0] for row in cursor.fetchall()) def fetch_non_system_resources(cursor): """Fetch IDs for all resources that are not of the 'system' category.""" cursor.execute( "SELECT resources.resource_id FROM resource_categories " "INNER JOIN resources " "ON resource_categories.resource_category_id=resources.resource_category_id " "WHERE resource_category_key != 'system'") return tuple(row[0] for row in cursor.fetchall()) def assign_sysadmin_role_on_non_system_resources(conn): """Assign sysadmins the sysadmin role on all non-system resources.""" with contextlib.closing(conn.cursor()) as cursor: sysadminroleid = system_administrator_role_id(cursor) cursor.executemany( "INSERT INTO user_roles(user_id, resource_id, role_id) " "VALUES (?, ?, ?)", tuple(item + (sysadminroleid,) for item in itertools.product( fetch_ids_for_sysadmin_users(cursor), fetch_non_system_resources(cursor)))) def revoke_sysadmin_role_on_non_system_resources(conn): """Revoke sysadmins the sysadmin role on all non-system resources.""" with contextlib.closing(conn.cursor()) as cursor: sysadminroleid = system_administrator_role_id(cursor) cursor.executemany( "DELETE FROM user_roles " "WHERE user_id=? AND resource_id=? AND role_id=?", tuple(item + (sysadminroleid,) for item in itertools.product( fetch_ids_for_sysadmin_users(cursor), fetch_non_system_resources(cursor)))) steps = [ step(assign_sysadmin_role_on_non_system_resources, revoke_sysadmin_role_on_non_system_resources) ]