diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users')
-rw-r--r-- | gn_auth/auth/authorisation/users/models.py | 34 | ||||
-rw-r--r-- | gn_auth/auth/authorisation/users/views.py | 90 |
2 files changed, 109 insertions, 15 deletions
diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py index ef3ce7f..d30bfd0 100644 --- a/gn_auth/auth/authorisation/users/models.py +++ b/gn_auth/auth/authorisation/users/models.py @@ -1,6 +1,5 @@ """Functions for acting on users.""" import uuid -from typing import Union from functools import reduce from datetime import datetime, timedelta @@ -37,29 +36,36 @@ def __process_age_clause__(age_desc: str) -> tuple[str, int]: case "exactly": return "created = :created", _param case _: - raise Exception("Invalid age descriptor.") + raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised] -def __list_user_clauses_and_params__(**kwargs) -> tuple[list[str], dict[str, Union[int, str]]]: +def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]: """Process the WHERE clauses, and params for the 'LIST USERS' query.""" - clauses = [] + clauses = "" params = {} - if bool(kwargs.get("email", "").strip()): - clauses = clauses + ["email LIKE :email"] + if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()): + clauses = "(email LIKE :email OR name LIKE :name)" + params = { + "email": f'%{kwargs["email"].strip()}%', + "name": f'%{kwargs["name"].strip()}%' + } + elif bool(kwargs.get("email", "").strip()): + clauses = "email LIKE :email" params["email"] = f'%{kwargs["email"].strip()}%' - - if bool(kwargs.get("name", "").strip()): - clauses = clauses + ["name LIKE :name"] + elif bool(kwargs.get("name", "").strip()): + clauses = "name LIKE :name" params["name"] = f'%{kwargs["name"].strip()}%' + else: + clauses = "" if bool(kwargs.get("verified", "").strip()): - clauses = clauses + ["verified=:verified"] - params["verified"] = 1 if kwargs["verified"].strip() == "yes" else "no" + clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified" + params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0" if bool(kwargs.get("age", "").strip()): _clause, _param = __process_age_clause__(kwargs["age"].strip()) - clauses = clauses + [_clause] - params["created"] = _param + clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause + params["created"] = str(_param) return clauses, params @@ -73,7 +79,7 @@ def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]: _query = "SELECT * FROM users" _clauses, _params = __list_user_clauses_and_params__(**kwargs) if len(_clauses) > 0: - _query = _query + " WHERE " + " AND ".join(_clauses) + _query = _query + " WHERE " + _clauses with db.cursor(conn) as cursor: cursor.execute(_query, _params) diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py index 5140bcb..55921ca 100644 --- a/gn_auth/auth/authorisation/users/views.py +++ b/gn_auth/auth/authorisation/users/views.py @@ -28,6 +28,9 @@ from gn_auth.auth.requests import request_json from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.db.sqlite3 import with_db_connection +from gn_auth.auth.authorisation.resources.system.models import system_resource + +from gn_auth.auth.authorisation.resources.checks import authorised_for2 from gn_auth.auth.authorisation.resources.models import ( user_resources as _user_resources) from gn_auth.auth.authorisation.roles.models import ( @@ -39,6 +42,7 @@ from gn_auth.auth.errors import ( NotFoundError, UsernameError, PasswordError, + AuthorisationError, UserRegistrationError) @@ -333,7 +337,11 @@ def list_all_users() -> Response: """List all the users.""" _kwargs = { key: value - for key, value in request.json.items() + for key, value in ( + request.json.items() if request.json + else { + "email": "", "name": "", "verified": "", "age": "" + }) if key in ("email", "name", "verified", "age") } @@ -548,3 +556,83 @@ def change_password(forgot_password_token): flash("Both the password and its confirmation MUST be provided!", "alert-danger") return change_password_page + + +@users.route("/delete", methods=["POST"]) +@require_oauth("profile user role") +def delete_users(): + """Delete the specified user.""" + with (require_oauth.acquire("profile") as _token, + db.connection(current_app.config["AUTH_DB"]) as conn, + db.cursor(conn) as cursor): + if not authorised_for2(conn, + _token.user, + system_resource(conn), + ("system:user:delete-user",)): + raise AuthorisationError( + "You need the `system:user:delete-user` privilege to delete " + "users from the system.") + + _form = request_json() + _user_ids = _form.get("user_ids", []) + _non_deletable = set((str(_token.user.user_id),)) + + cursor.execute("SELECT user_id FROM group_users") + _non_deletable.update(row["user_id"] for row in cursor.fetchall()) + + cursor.execute("SELECT user_id FROM oauth2_clients;") + _non_deletable.update(row["user_id"] for row in cursor.fetchall()) + + _important_roles = ( + "group-leader", + "resource-owner", + "system-administrator", + "inbredset-group-owner") + _paramstr = ",".join(["?"] * len(_important_roles)) + cursor.execute( + "SELECT DISTINCT user_roles.user_id FROM user_roles " + "INNER JOIN roles ON user_roles.role_id=roles.role_id " + f"WHERE roles.role_name IN ({_paramstr})", + _important_roles) + _non_deletable.update(row["user_id"] for row in cursor.fetchall()) + + _delete = tuple(uid for uid in _user_ids if uid not in _non_deletable) + _paramstr = ", ".join(["?"] * len(_delete)) + if len(_delete) > 0: + _dependent_tables = ( + ("authorisation_code", "user_id"), + ("forgot_password_tokens", "user_id"), + ("group_join_requests", "requester_id"), + ("jwt_refresh_tokens", "user_id"), + ("oauth2_tokens", "user_id"), + ("user_credentials", "user_id"), + ("user_roles", "user_id"), + ("user_verification_codes", "user_id")) + for _table, _col in _dependent_tables: + cursor.execute( + f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})", + _delete) + + cursor.execute( + f"DELETE FROM users WHERE user_id IN ({_paramstr})", + _delete) + _deleted_rows = cursor.rowcount + _diff = len(_user_ids) - _deleted_rows + return jsonify({ + "total-requested": len(_user_ids), + "total-deleted": _deleted_rows, + "not-deleted": _diff, + "message": ( + f"Successfully deleted {_deleted_rows} users." + + (" Some users could not be deleted." if _diff > 0 else "")) + }) + + return jsonify({ + "total-requested": len(_user_ids), + "total-deleted": 0, + "not-deleted": len(_user_ids), + "error": "Zero users were deleted", + "error_description": ( + "Either no users were selected or all the selected users are " + "system administrators, group members, or resource owners.") + }), 400 |