diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users/views.py')
| -rw-r--r-- | gn_auth/auth/authorisation/users/views.py | 240 |
1 files changed, 228 insertions, 12 deletions
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py index 7adcd06..4061e07 100644 --- a/gn_auth/auth/authorisation/users/views.py +++ b/gn_auth/auth/authorisation/users/views.py @@ -3,10 +3,10 @@ import uuid import sqlite3 import secrets import traceback -from typing import Any -from functools import partial from dataclasses import asdict +from typing import Any, Sequence from urllib.parse import urljoin +from functools import reduce, partial from datetime import datetime, timedelta from email.headerregistry import Address from email_validator import validate_email, EmailNotValidError @@ -28,6 +28,9 @@ from gn_auth.auth.requests import request_json from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.db.sqlite3 import with_db_connection +from gn_auth.auth.authorisation.resources.system.models import system_resource + +from gn_auth.auth.authorisation.resources.checks import authorised_for2 from gn_auth.auth.authorisation.resources.models import ( user_resources as _user_resources) from gn_auth.auth.authorisation.roles.models import ( @@ -39,6 +42,7 @@ from gn_auth.auth.errors import ( NotFoundError, UsernameError, PasswordError, + AuthorisationError, UserRegistrationError) @@ -71,7 +75,7 @@ def user_details() -> Response: False, lambda grp: grp)# type: ignore[arg-type] return jsonify({ **user_dets, - "group": asdict(the_group) if the_group else False + **({"group": asdict(the_group)} if the_group else {}) }) @users.route("/roles", methods=["GET"]) @@ -114,6 +118,30 @@ def user_address(user: User) -> Address: """Compute the `email.headerregistry.Address` from a `User`""" return Address(display_name=user.name, addr_spec=user.email) + +def display_minutes_for_humans(minutes): + """Convert minutes into human-readable display.""" + _week_ = 10080 # minutes + _day_ = 1440 # minutes + _remainder_ = minutes + + _human_readable_ = "" + if _remainder_ >= _week_: + _weeks_ = _remainder_ // _week_ + _remainder_ = _remainder_ % _week_ + _human_readable_ += f"{_weeks_} week" + ("s" if _weeks_ > 1 else "") + + if _remainder_ >= _day_: + _days_ = _remainder_ // _day_ + _remainder_ = _remainder_ % _day_ + _human_readable_ += (" " if bool(_human_readable_) else "") + \ + f"{_days_} day" + ("s" if _days_ > 1 else "") + + if _remainder_ > 0: + _human_readable_ += (" " if bool(_human_readable_) else "") + f"{_remainder_} minutes" + + return _human_readable_ + def send_verification_email( conn, user: User, @@ -125,7 +153,7 @@ def send_verification_email( subject="GeneNetwork: Please Verify Your Email" verification_code = secrets.token_urlsafe(64) generated = datetime.now() - expiration_minutes = 15 + expiration_minutes = current_app.config["AUTH_EMAILS_EXPIRY_MINUTES"] def __render__(template): return render_template(template, subject=subject, @@ -137,7 +165,8 @@ def send_verification_email( client_id=client_id, redirect_uri=redirect_uri, verificationcode=verification_code)), - expiration_minutes=expiration_minutes) + expiration_minutes=display_minutes_for_humans( + expiration_minutes)) with db.cursor(conn) as cursor: cursor.execute( ("INSERT INTO " @@ -180,7 +209,7 @@ def register_user() -> Response: with db.cursor(conn) as cursor: user, _hashed_password = set_user_password( cursor, save_user( - cursor, email["email"], user_name), password) + cursor, email["email"], user_name), password) # type: ignore assign_default_roles(cursor, user) send_verification_email(conn, user, @@ -196,7 +225,7 @@ def register_user() -> Response: current_app.logger.error(traceback.format_exc()) raise(UserRegistrationError(f"Email Error: {str(enve)}")) from enve - raise Exception( + raise Exception(# pylint: disable=[broad-exception-raised] "unknown_error", "The system experienced an unexpected error.") def delete_verification_code(cursor, code: str): @@ -306,9 +335,33 @@ def user_join_request_exists(): @require_oauth("profile user") def list_all_users() -> Response: """List all the users.""" - with require_oauth.acquire("profile group") as _the_token: - return jsonify(tuple( - asdict(user) for user in with_db_connection(list_users))) + _kwargs = ( + { + key: value + for key, value in request_json().items() + if key in ("email", "name", "verified", "age") + } + or + { + "email": "", "name": "", "verified": "", "age": "" + } + ) + + with (require_oauth.acquire("profile group") as _the_token, + db.connection(current_app.config["AUTH_DB"]) as conn, + db.cursor(conn) as cursor): + _users = list_users(conn, **_kwargs) + _start = int(_kwargs.get("start", "0")) + _length = int(_kwargs.get("length", "0")) + cursor.execute("SELECT COUNT(*) FROM users") + _total_users = int(cursor.fetchone()["COUNT(*)"]) + return jsonify({ + "users": tuple(asdict(user) for user in + (_users[_start:_start+_length] + if _length else _users)), + "total-users": _total_users, + "total-filtered": len(_users) + }) @users.route("/handle-unverified", methods=["POST"]) def handle_unverified(): @@ -380,7 +433,7 @@ def send_forgot_password_email( subject="GeneNetwork: Change Your Password" token = secrets.token_urlsafe(64) generated = datetime.now() - expiration_minutes = 15 + expiration_minutes = current_app.config["AUTH_EMAILS_EXPIRY_MINUTES"] def __render__(template): return render_template(template, subject=subject, @@ -391,7 +444,8 @@ def send_forgot_password_email( client_id=client_id, redirect_uri=redirect_uri, response_type=response_type)), - expiration_minutes=expiration_minutes) + expiration_minutes=display_minutes_for_humans( + expiration_minutes)) with db.cursor(conn) as cursor: cursor.execute( @@ -504,3 +558,165 @@ def change_password(forgot_password_token): flash("Both the password and its confirmation MUST be provided!", "alert-danger") return change_password_page + + +def __delete_users_individually__(cursor, user_ids, tables): + """Recovery function with dismal performance.""" + _errors = tuple() + for _user_id in user_ids: + for _table, _col in tables: + try: + cursor.execute( + f"DELETE FROM {_table} WHERE {_col}=?", + (str(_user_id),)) + except sqlite3.IntegrityError: + _errors = _errors + ( + (("user_id", _user_id), + ("reason", f"User has data in table {_table}")),) + + return _errors + + +def __fetch_non_deletable_users__(cursor, ids_and_reasons): + """Fetch detail for non-deletable users.""" + def __merge__(acc, curr): + _curr = dict(curr) + _this_dict = acc.get( + curr["user_id"], {"reasons": tuple()}) + _this_dict["reasons"] = _this_dict["reasons"] + (_curr["reason"],) + return {**acc, curr["user_id"]: _this_dict} + + _reasons_by_id = reduce(__merge__, + (dict(row) for row in ids_and_reasons), + {}) + _user_ids = tuple(_reasons_by_id.keys()) + _paramstr = ", ".join(["?"] * len(_user_ids)) + cursor.execute(f"SELECT * FROM users WHERE user_id IN ({_paramstr})", + _user_ids) + return tuple({ + "user": dict(row), + "reasons": _reasons_by_id[row["user_id"]]["reasons"] + } for row in cursor.fetchall()) + + +def __non_deletable_with_reason__( + user_ids: tuple[str, ...], + dbrows: Sequence[sqlite3.Row], + reason: str + ) -> tuple[tuple[tuple[str, str], tuple[str, str]], ...]: + """Build a list of 'non-deletable' user objects.""" + return tuple((("user_id", _uid), ("reason", reason)) + for _uid in user_ids + if _uid in tuple(row["user_id"] for row in dbrows)) + + +@users.route("/delete", methods=["POST"]) +@require_oauth("profile user role") +def delete_users(): + """Delete the specified user.""" + with (require_oauth.acquire("profile") as _token, + db.connection(current_app.config["AUTH_DB"]) as conn, + db.cursor(conn) as cursor): + if not authorised_for2(conn, + _token.user, + system_resource(conn), + ("system:user:delete-user",)): + raise AuthorisationError( + "You need the `system:user:delete-user` privilege to delete " + "users from the system.") + + _form = request_json() + _user_ids = _form.get("user_ids", []) + _non_deletable = set() + if str(_token.user.user_id) in _user_ids: + _non_deletable.add( + (("user_id", str(_token.user.user_id),), + ("reason", "You are not allowed to delete yourself."))) + + cursor.execute("SELECT user_id FROM group_users") + _group_members = tuple(row["user_id"] for row in cursor.fetchall()) + _non_deletable.update(__non_deletable_with_reason__( + _user_ids, + cursor.fetchall(), + "User is member of a user group.")) + + cursor.execute("SELECT user_id FROM oauth2_clients;") + _non_deletable.update(__non_deletable_with_reason__( + _user_ids, + cursor.fetchall(), + "User is registered owner of an OAuth client.")) + + _important_roles = ( + "group-leader", + "resource-owner", + "system-administrator", + "inbredset-group-owner") + _paramstr = ",".join(["?"] * len(_important_roles)) + cursor.execute( + "SELECT DISTINCT user_roles.user_id FROM user_roles " + "INNER JOIN roles ON user_roles.role_id=roles.role_id " + f"WHERE roles.role_name IN ({_paramstr})", + _important_roles) + _non_deletable.update(__non_deletable_with_reason__( + _user_ids, + cursor.fetchall(), + f"User holds on of the following roles: {_important_roles}")) + + _delete = tuple(uid for uid in _user_ids if uid not in + (dict(row)["user_id"] for row in _non_deletable)) + _paramstr = ", ".join(["?"] * len(_delete)) + if len(_delete) > 0: + _dependent_tables = ( + ("authorisation_code", "user_id"), + ("forgot_password_tokens", "user_id"), + ("group_join_requests", "requester_id"), + ("jwt_refresh_tokens", "user_id"), + ("oauth2_tokens", "user_id"), + ("user_credentials", "user_id"), + ("user_roles", "user_id"), + ("user_verification_codes", "user_id")) + try: + for _table, _col in _dependent_tables: + cursor.execute( + f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})", + _delete) + except sqlite3.IntegrityError: + _non_deletable.update(__delete_users_individually__( + cursor, _delete, _dependent_tables)) + + _not_deleted = __fetch_non_deletable_users__( + cursor, _non_deletable) + _delete = tuple(# rebuild with those that failed. + _user_id for _user_id in _delete if _user_id not in + tuple(row["user"]["user_id"] for row in _not_deleted)) + _paramstr = ", ".join(["?"] * len(_delete)) + cursor.execute( + f"DELETE FROM users WHERE user_id IN ({_paramstr})", + _delete) + _deleted_rows = cursor.rowcount + return jsonify({ + "total-requested": len(_user_ids), + "total-deleted": _deleted_rows, + "not-deleted": _not_deleted, + "deleted": _deleted_rows, + "message": ( + f"Successfully deleted {_deleted_rows} users." + + (" Some users could not be deleted." + if len(_user_ids) - _deleted_rows > 0 + else "")) + }) + + _not_deleted = __fetch_non_deletable_users__(cursor, _non_deletable) + + return jsonify({ + "total-requested": len(_user_ids), + "total-deleted": 0, + "not-deleted": _not_deleted, + "deleted": 0, + "error": "Zero users were deleted", + "error_description": ( + "No users were selected for deletion." + if len(_user_ids) == 0 + else ("The selected users are system administrators, group " + "members, or resource owners.")) + }), 400 |
