diff options
| author | Claude Sonnet 4.6 | 2026-06-02 00:00:00 +0000 |
|---|---|---|
| committer | Frederick Muriuki Muriithi | 2026-06-02 15:01:38 -0500 |
| commit | 597e1bb7d35368930c0a5d26cf32969c62bc2278 (patch) | |
| tree | 41a8ad5852c8c17f44bcbf76d8d80d4d4e2dfdf0 /gn_auth | |
| parent | 2691578edfc84817d9a9aafb35e3e29d04d2613e (diff) | |
| download | gn-auth-597e1bb7d35368930c0a5d26cf32969c62bc2278.tar.gz | |
Add a low-level delete_users_by_id function that removes users and all their dependent data unconditionally, bypassing the policy checks in the '/auth/users/delete' HTTP endpoint (which refuses to delete privileged users). This is intended for use by CLI test-teardown commands and the sudo-wrapped CI cleanup script. It might also find utility in other places where we do actually need to delete a user and their data unconditionally. Co-authored-by: Frederick Muriuki Muriithi <fredmanglis@gmail.com>
Diffstat (limited to 'gn_auth')
| -rw-r--r-- | gn_auth/auth/authorisation/users/models.py | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py index d30bfd0..ab7a980 100644 --- a/gn_auth/auth/authorisation/users/models.py +++ b/gn_auth/auth/authorisation/users/models.py @@ -1,5 +1,6 @@ """Functions for acting on users.""" import uuid +import warnings from functools import reduce from datetime import datetime, timedelta @@ -128,3 +129,40 @@ def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tu (str(user.user_id),)) return __build_resource_roles__( (dict(row) for row in cursor.fetchall())) + + +def delete_users_by_id( + conn: db.DbConnection, + user_ids: tuple[uuid.UUID, ...] +) -> int: + """Delete users unconditionally by ID, removing all dependent data. + + Unlike the HTTP endpoint, this bypasses all policy checks — users are + deleted regardless of their roles or group memberships. Returns the + number of users removed from the users table. + """ + warnings.warn( + (f"Running dangerous function `{__name__}.delete_users_by_id`. " + "Do ensure that is what you actually want."), + category=RuntimeWarning) + if not user_ids: + return 0 + _ids = tuple(str(uid) for uid in user_ids) + _paramstr = ", ".join(["?"] * len(_ids)) + _dependent_tables = ( + ("authorisation_code", "user_id"), + ("forgot_password_tokens", "user_id"), + ("group_join_requests", "requester_id"), + ("jwt_refresh_tokens", "user_id"), + ("oauth2_tokens", "user_id"), + ("user_credentials", "user_id"), + ("user_roles", "user_id"), + ("user_verification_codes", "user_id"), + ) + with db.cursor(conn) as cursor: + for table, col in _dependent_tables: + cursor.execute( + f"DELETE FROM {table} WHERE {col} IN ({_paramstr})", _ids) + cursor.execute( + f"DELETE FROM users WHERE user_id IN ({_paramstr})", _ids) + return cursor.rowcount |
