diff options
-rw-r--r-- | gn_auth/auth/authorisation/users/views.py | 121 |
1 files changed, 97 insertions, 24 deletions
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py index 5843393..394b92c 100644 --- a/gn_auth/auth/authorisation/users/views.py +++ b/gn_auth/auth/authorisation/users/views.py @@ -3,10 +3,10 @@ import uuid import sqlite3 import secrets import traceback -from typing import Any -from functools import partial from dataclasses import asdict +from typing import Any, Sequence from urllib.parse import urljoin +from functools import reduce, partial from datetime import datetime, timedelta from email.headerregistry import Address from email_validator import validate_email, EmailNotValidError @@ -560,6 +560,56 @@ def change_password(forgot_password_token): return change_password_page +def __delete_users_individually__(cursor, user_ids, tables): + """Recovery function with dismal performance.""" + _errors = tuple() + for _user_id in user_ids: + for _table, _col in tables: + try: + cursor.execute( + f"DELETE FROM {_table} WHERE {_col}=?", + (str(_user_id),)) + except sqlite3.IntegrityError: + _errors = _errors + ( + (("user_id", _user_id), + ("reason", f"User has data in table {_table}")),) + + return _errors + + +def __fetch_non_deletable_users__(cursor, ids_and_reasons): + """Fetch detail for non-deletable users.""" + def __merge__(acc, curr): + _curr = dict(curr) + _this_dict = acc.get( + curr["user_id"], {"reasons": tuple()}) + _this_dict["reasons"] = _this_dict["reasons"] + (_curr["reason"],) + return {**acc, curr["user_id"]: _this_dict} + + _reasons_by_id = reduce(__merge__, + (dict(row) for row in ids_and_reasons), + {}) + _user_ids = tuple(_reasons_by_id.keys()) + _paramstr = ", ".join(["?"] * len(_user_ids)) + cursor.execute(f"SELECT * FROM users WHERE user_id IN ({_paramstr})", + _user_ids) + return tuple({ + "user": dict(row), + "reasons": _reasons_by_id[row["user_id"]] + } for row in cursor.fetchall()) + + +def __non_deletable_with_reason__( + user_ids: tuple[str, ...], + dbrows: Sequence[sqlite3.Row], + reason: str + ) -> tuple[tuple[tuple[str, str], tuple[str, str]], ...]: + """Build a list of 'non-deletable' user objects.""" + return tuple((("user_id", _uid), ("reason", reason)) + for _uid in user_ids + if _uid in tuple(row["user_id"] for row in dbrows)) + + @users.route("/delete", methods=["POST"]) @require_oauth("profile user role") def delete_users(): @@ -577,13 +627,24 @@ def delete_users(): _form = request_json() _user_ids = _form.get("user_ids", []) - _non_deletable = set((str(_token.user.user_id),)) + _non_deletable = set() + if str(_token.user.user_id) in _user_ids: + _non_deletable.add( + (("user_id", str(_token.user.user_id),), + ("reason", "You are not allowed to delete yourself."))) cursor.execute("SELECT user_id FROM group_users") - _non_deletable.update(row["user_id"] for row in cursor.fetchall()) + _group_members = tuple(row["user_id"] for row in cursor.fetchall()) + _non_deletable.update(__non_deletable_with_reason__( + _user_ids, + cursor.fetchall(), + "User is member of a user group.")) cursor.execute("SELECT user_id FROM oauth2_clients;") - _non_deletable.update(row["user_id"] for row in cursor.fetchall()) + _non_deletable.update(__non_deletable_with_reason__( + _user_ids, + cursor.fetchall(), + "User is registered owner of an OAuth client.")) _important_roles = ( "group-leader", @@ -596,9 +657,13 @@ def delete_users(): "INNER JOIN roles ON user_roles.role_id=roles.role_id " f"WHERE roles.role_name IN ({_paramstr})", _important_roles) - _non_deletable.update(row["user_id"] for row in cursor.fetchall()) + _non_deletable.update(__non_deletable_with_reason__( + _user_ids, + cursor.fetchall(), + f"User holds on of the following roles: {_important_roles}")) - _delete = tuple(uid for uid in _user_ids if uid not in _non_deletable) + _delete = tuple(uid for uid in _user_ids if uid not in + (dict(row)["user_id"] for row in _non_deletable)) _paramstr = ", ".join(["?"] * len(_delete)) if len(_delete) > 0: _dependent_tables = ( @@ -610,38 +675,46 @@ def delete_users(): ("user_credentials", "user_id"), ("user_roles", "user_id"), ("user_verification_codes", "user_id")) - for _table, _col in _dependent_tables: - # TODO: Figure out which of these tables leads to an - # IntegrityError when attempting to do a delete. - # Might have to use code with really dismal performance - # here in order to get more information on what fails - # and for whom. Maybe a try-catch, with the catch - # calling a function that loops through each user and - # table, collecting failure stats. - cursor.execute( - f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})", - _delete) - + try: + for _table, _col in _dependent_tables: + cursor.execute( + f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})", + _delete) + except sqlite3.IntegrityError: + _non_deletable.update(__delete_users_individually__( + cursor, _delete, _dependent_tables)) + + _not_deleted = __fetch_non_deletable_users__( + cursor, _non_deletable) + _delete = tuple(# rebuild with those that failed. + _user_id for _user_id in _delete if _user_id not in + tuple(_not_deleted.keys())) + _paramstr = ", ".join(["?"] * len(_delete)) cursor.execute( f"DELETE FROM users WHERE user_id IN ({_paramstr})", _delete) _deleted_rows = cursor.rowcount - _diff = len(_user_ids) - _deleted_rows return jsonify({ "total-requested": len(_user_ids), "total-deleted": _deleted_rows, - "not-deleted": _diff, + "not-deleted": _not_deleted, + "deleted": _deleted_rows, "message": ( f"Successfully deleted {_deleted_rows} users." + (" Some users could not be deleted." if _diff > 0 else "")) }) + _not_deleted = __fetch_non_deletable_users__(cursor, _non_deletable) + return jsonify({ "total-requested": len(_user_ids), "total-deleted": 0, - "not-deleted": len(_user_ids), + "not-deleted": _not_deleted, + "deleted": 0, "error": "Zero users were deleted", "error_description": ( - "Either no users were selected or all the selected users are " - "system administrators, group members, or resource owners.") + "No users were selected for deletion." + if len(_user_ids) == 0 + else ("The selected users are system administrators, group " + "members, or resource owners.")) }), 400 |