diff options
author | Frederick Muriuki Muriithi | 2025-07-29 11:17:10 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2025-07-29 11:17:10 -0500 |
commit | e7964b6b1ebdc258bb697e2db4f3215df7a4c3b8 (patch) | |
tree | b2db071c7b77e14c28111c5a48fb1ee574c7f844 | |
parent | 10fe4b01afeffa524ee06c1d5e07396269c8b7ea (diff) | |
download | gn-auth-e7964b6b1ebdc258bb697e2db4f3215df7a4c3b8.tar.gz |
Assign `system-administrator` role on all non-system resources.
-rw-r--r-- | migrations/auth/20250729_03_oCvvq-grant-role-to-all-resources-to-sys-admin-users.py | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/migrations/auth/20250729_03_oCvvq-grant-role-to-all-resources-to-sys-admin-users.py b/migrations/auth/20250729_03_oCvvq-grant-role-to-all-resources-to-sys-admin-users.py new file mode 100644 index 0000000..e3bdc8f --- /dev/null +++ b/migrations/auth/20250729_03_oCvvq-grant-role-to-all-resources-to-sys-admin-users.py @@ -0,0 +1,75 @@ +""" +Grant role to ALL resources to sys-admin users. +""" +import itertools +import contextlib + +from yoyo import step + +__depends__ = {'20250729_02_7ycSm-assign-initial-system-wide-resources-access-privileges-to-sys-admins'} + + +def system_administrator_role_id(cursor): + """Fetch ID for role 'system-administrator'.""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name='system-administrator'") + return cursor.fetchone()[0] + + +def system_resource_id(cursor): + cursor.execute( + "SELECT resources.resource_id FROM resource_categories " + "INNER JOIN resources ON resource_categories.resource_category_id=resources.resource_category_id " + "WHERE resource_category_key = 'system'") + return cursor.fetchone()[0] + + +def fetch_ids_for_sysadmin_users(cursor): + """Fetch all sysadmin users' IDs.""" + cursor.execute( + "SELECT user_roles.user_id FROM roles INNER JOIN user_roles " + "ON roles.role_id=user_roles.role_id " + "WHERE role_name='system-administrator' AND resource_id=?", + (system_resource_id(cursor),)) + return tuple(row[0] for row in cursor.fetchall()) + + +def fetch_non_system_resources(cursor): + """Fetch IDs for all resources that are not of the 'system' category.""" + cursor.execute( + "SELECT resources.resource_id FROM resource_categories " + "INNER JOIN resources " + "ON resource_categories.resource_category_id=resources.resource_category_id " + "WHERE resource_category_key != 'system'") + return tuple(row[0] for row in cursor.fetchall()) + + +def assign_sysadmin_role_on_non_system_resources(conn): + """Assign sysadmins the sysadmin role on all non-system resources.""" + with contextlib.closing(conn.cursor()) as cursor: + sysadminroleid = system_administrator_role_id(cursor) + cursor.executemany( + "INSERT INTO user_roles(user_id, resource_id, role_id) " + "VALUES (?, ?, ?)", + tuple(item + (sysadminroleid,) + for item in itertools.product( + fetch_ids_for_sysadmin_users(cursor), + fetch_non_system_resources(cursor)))) + + +def revoke_sysadmin_role_on_non_system_resources(conn): + """Revoke sysadmins the sysadmin role on all non-system resources.""" + with contextlib.closing(conn.cursor()) as cursor: + sysadminroleid = system_administrator_role_id(cursor) + cursor.executemany( + "DELETE FROM user_roles " + "WHERE user_id=? AND resource_id=? AND role_id=?", + tuple(item + (sysadminroleid,) + for item in itertools.product( + fetch_ids_for_sysadmin_users(cursor), + fetch_non_system_resources(cursor)))) + +steps = [ + step(assign_sysadmin_role_on_non_system_resources, + revoke_sysadmin_role_on_non_system_resources) +] |