diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py
index d30bfd0..ab7a980 100644
--- a/gn_auth/auth/authorisation/users/models.py
+++ b/gn_auth/auth/authorisation/users/models.py
@@ -1,5 +1,6 @@
"""Functions for acting on users."""
import uuid
+import warnings
from functools import reduce
from datetime import datetime, timedelta
@@ -128,3 +129,40 @@ def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tu
(str(user.user_id),))
return __build_resource_roles__(
(dict(row) for row in cursor.fetchall()))
+
+
+def delete_users_by_id(
+ conn: db.DbConnection,
+ user_ids: tuple[uuid.UUID, ...]
+) -> int:
+ """Delete users unconditionally by ID, removing all dependent data.
+
+ Unlike the HTTP endpoint, this bypasses all policy checks — users are
+ deleted regardless of their roles or group memberships. Returns the
+ number of users removed from the users table.
+ """
+ warnings.warn(
+ (f"Running dangerous function `{__name__}.delete_users_by_id`. "
+ "Do ensure that is what you actually want."),
+ category=RuntimeWarning)
+ if not user_ids:
+ return 0
+ _ids = tuple(str(uid) for uid in user_ids)
+ _paramstr = ", ".join(["?"] * len(_ids))
+ _dependent_tables = (
+ ("authorisation_code", "user_id"),
+ ("forgot_password_tokens", "user_id"),
+ ("group_join_requests", "requester_id"),
+ ("jwt_refresh_tokens", "user_id"),
+ ("oauth2_tokens", "user_id"),
+ ("user_credentials", "user_id"),
+ ("user_roles", "user_id"),
+ ("user_verification_codes", "user_id"),
+ )
+ with db.cursor(conn) as cursor:
+ for table, col in _dependent_tables:
+ cursor.execute(
+ f"DELETE FROM {table} WHERE {col} IN ({_paramstr})", _ids)
+ cursor.execute(
+ f"DELETE FROM users WHERE user_id IN ({_paramstr})", _ids)
+ return cursor.rowcount
|