aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn_auth/auth/authorisation/users/views.py121
1 files changed, 97 insertions, 24 deletions
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py
index 5843393..394b92c 100644
--- a/gn_auth/auth/authorisation/users/views.py
+++ b/gn_auth/auth/authorisation/users/views.py
@@ -3,10 +3,10 @@ import uuid
import sqlite3
import secrets
import traceback
-from typing import Any
-from functools import partial
from dataclasses import asdict
+from typing import Any, Sequence
from urllib.parse import urljoin
+from functools import reduce, partial
from datetime import datetime, timedelta
from email.headerregistry import Address
from email_validator import validate_email, EmailNotValidError
@@ -560,6 +560,56 @@ def change_password(forgot_password_token):
return change_password_page
+def __delete_users_individually__(cursor, user_ids, tables):
+ """Recovery function with dismal performance."""
+ _errors = tuple()
+ for _user_id in user_ids:
+ for _table, _col in tables:
+ try:
+ cursor.execute(
+ f"DELETE FROM {_table} WHERE {_col}=?",
+ (str(_user_id),))
+ except sqlite3.IntegrityError:
+ _errors = _errors + (
+ (("user_id", _user_id),
+ ("reason", f"User has data in table {_table}")),)
+
+ return _errors
+
+
+def __fetch_non_deletable_users__(cursor, ids_and_reasons):
+ """Fetch detail for non-deletable users."""
+ def __merge__(acc, curr):
+ _curr = dict(curr)
+ _this_dict = acc.get(
+ curr["user_id"], {"reasons": tuple()})
+ _this_dict["reasons"] = _this_dict["reasons"] + (_curr["reason"],)
+ return {**acc, curr["user_id"]: _this_dict}
+
+ _reasons_by_id = reduce(__merge__,
+ (dict(row) for row in ids_and_reasons),
+ {})
+ _user_ids = tuple(_reasons_by_id.keys())
+ _paramstr = ", ".join(["?"] * len(_user_ids))
+ cursor.execute(f"SELECT * FROM users WHERE user_id IN ({_paramstr})",
+ _user_ids)
+ return tuple({
+ "user": dict(row),
+ "reasons": _reasons_by_id[row["user_id"]]
+ } for row in cursor.fetchall())
+
+
+def __non_deletable_with_reason__(
+ user_ids: tuple[str, ...],
+ dbrows: Sequence[sqlite3.Row],
+ reason: str
+ ) -> tuple[tuple[tuple[str, str], tuple[str, str]], ...]:
+ """Build a list of 'non-deletable' user objects."""
+ return tuple((("user_id", _uid), ("reason", reason))
+ for _uid in user_ids
+ if _uid in tuple(row["user_id"] for row in dbrows))
+
+
@users.route("/delete", methods=["POST"])
@require_oauth("profile user role")
def delete_users():
@@ -577,13 +627,24 @@ def delete_users():
_form = request_json()
_user_ids = _form.get("user_ids", [])
- _non_deletable = set((str(_token.user.user_id),))
+ _non_deletable = set()
+ if str(_token.user.user_id) in _user_ids:
+ _non_deletable.add(
+ (("user_id", str(_token.user.user_id),),
+ ("reason", "You are not allowed to delete yourself.")))
cursor.execute("SELECT user_id FROM group_users")
- _non_deletable.update(row["user_id"] for row in cursor.fetchall())
+ _group_members = tuple(row["user_id"] for row in cursor.fetchall())
+ _non_deletable.update(__non_deletable_with_reason__(
+ _user_ids,
+ cursor.fetchall(),
+ "User is member of a user group."))
cursor.execute("SELECT user_id FROM oauth2_clients;")
- _non_deletable.update(row["user_id"] for row in cursor.fetchall())
+ _non_deletable.update(__non_deletable_with_reason__(
+ _user_ids,
+ cursor.fetchall(),
+ "User is registered owner of an OAuth client."))
_important_roles = (
"group-leader",
@@ -596,9 +657,13 @@ def delete_users():
"INNER JOIN roles ON user_roles.role_id=roles.role_id "
f"WHERE roles.role_name IN ({_paramstr})",
_important_roles)
- _non_deletable.update(row["user_id"] for row in cursor.fetchall())
+ _non_deletable.update(__non_deletable_with_reason__(
+ _user_ids,
+ cursor.fetchall(),
+ f"User holds on of the following roles: {_important_roles}"))
- _delete = tuple(uid for uid in _user_ids if uid not in _non_deletable)
+ _delete = tuple(uid for uid in _user_ids if uid not in
+ (dict(row)["user_id"] for row in _non_deletable))
_paramstr = ", ".join(["?"] * len(_delete))
if len(_delete) > 0:
_dependent_tables = (
@@ -610,38 +675,46 @@ def delete_users():
("user_credentials", "user_id"),
("user_roles", "user_id"),
("user_verification_codes", "user_id"))
- for _table, _col in _dependent_tables:
- # TODO: Figure out which of these tables leads to an
- # IntegrityError when attempting to do a delete.
- # Might have to use code with really dismal performance
- # here in order to get more information on what fails
- # and for whom. Maybe a try-catch, with the catch
- # calling a function that loops through each user and
- # table, collecting failure stats.
- cursor.execute(
- f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})",
- _delete)
-
+ try:
+ for _table, _col in _dependent_tables:
+ cursor.execute(
+ f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})",
+ _delete)
+ except sqlite3.IntegrityError:
+ _non_deletable.update(__delete_users_individually__(
+ cursor, _delete, _dependent_tables))
+
+ _not_deleted = __fetch_non_deletable_users__(
+ cursor, _non_deletable)
+ _delete = tuple(# rebuild with those that failed.
+ _user_id for _user_id in _delete if _user_id not in
+ tuple(_not_deleted.keys()))
+ _paramstr = ", ".join(["?"] * len(_delete))
cursor.execute(
f"DELETE FROM users WHERE user_id IN ({_paramstr})",
_delete)
_deleted_rows = cursor.rowcount
- _diff = len(_user_ids) - _deleted_rows
return jsonify({
"total-requested": len(_user_ids),
"total-deleted": _deleted_rows,
- "not-deleted": _diff,
+ "not-deleted": _not_deleted,
+ "deleted": _deleted_rows,
"message": (
f"Successfully deleted {_deleted_rows} users." +
(" Some users could not be deleted." if _diff > 0 else ""))
})
+ _not_deleted = __fetch_non_deletable_users__(cursor, _non_deletable)
+
return jsonify({
"total-requested": len(_user_ids),
"total-deleted": 0,
- "not-deleted": len(_user_ids),
+ "not-deleted": _not_deleted,
+ "deleted": 0,
"error": "Zero users were deleted",
"error_description": (
- "Either no users were selected or all the selected users are "
- "system administrators, group members, or resource owners.")
+ "No users were selected for deletion."
+ if len(_user_ids) == 0
+ else ("The selected users are system administrators, group "
+ "members, or resource owners."))
}), 400