about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/users/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/users/views.py')
-rw-r--r--gn_auth/auth/authorisation/users/views.py221
1 files changed, 214 insertions, 7 deletions
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py
index b37164a..cae2605 100644
--- a/gn_auth/auth/authorisation/users/views.py
+++ b/gn_auth/auth/authorisation/users/views.py
@@ -3,10 +3,10 @@ import uuid
 import sqlite3
 import secrets
 import traceback
-from typing import Any
-from functools import partial
 from dataclasses import asdict
+from typing import Any, Sequence
 from urllib.parse import urljoin
+from functools import reduce, partial
 from datetime import datetime, timedelta
 from email.headerregistry import Address
 from email_validator import validate_email, EmailNotValidError
@@ -28,6 +28,9 @@ from gn_auth.auth.requests import request_json
 from gn_auth.auth.db import sqlite3 as db
 from gn_auth.auth.db.sqlite3 import with_db_connection
 
+from gn_auth.auth.authorisation.resources.system.models import system_resource
+
+from gn_auth.auth.authorisation.resources.checks import authorised_for2
 from gn_auth.auth.authorisation.resources.models import (
     user_resources as _user_resources)
 from gn_auth.auth.authorisation.roles.models import (
@@ -39,10 +42,12 @@ from gn_auth.auth.errors import (
     NotFoundError,
     UsernameError,
     PasswordError,
+    AuthorisationError,
     UserRegistrationError)
 
 
-from gn_auth.auth.authentication.users import valid_login, user_by_email
+from gn_auth.auth.authentication.users import (
+    valid_login, user_by_email, user_by_id)
 from gn_auth.auth.authentication.oauth2.resource_server import require_oauth
 from gn_auth.auth.authentication.users import User, save_user, set_user_password
 from gn_auth.auth.authentication.oauth2.models.oauth2token import (
@@ -71,8 +76,24 @@ def user_details() -> Response:
                 False, lambda grp: grp)# type: ignore[arg-type]
             return jsonify({
                 **user_dets,
-                "group": asdict(the_group) if the_group else False
+                **({"group": asdict(the_group)} if the_group else {})
+            })
+
+@users.route("/<user_id>", methods=["GET"])
+def get_user(user_id: str) -> Response:
+    """Fetch user details by user_id."""
+    try:
+        with db.connection(current_app.config["AUTH_DB"]) as conn:
+            user = user_by_id(conn, uuid.UUID(user_id))
+            return jsonify({
+                "user_id": str(user.user_id),
+                "email": user.email,
+                "name": user.name
             })
+    except ValueError:
+        return jsonify({"error": "Invalid user ID format"}), 400
+    except NotFoundError:
+        return jsonify({"error": "User not found"}), 404
 
 @users.route("/roles", methods=["GET"])
 @require_oauth("role")
@@ -331,9 +352,33 @@ def user_join_request_exists():
 @require_oauth("profile user")
 def list_all_users() -> Response:
     """List all the users."""
-    with require_oauth.acquire("profile group") as _the_token:
-        return jsonify(tuple(
-            asdict(user) for user in with_db_connection(list_users)))
+    _kwargs = (
+        {
+            key: value
+            for key, value in request_json().items()
+            if key in ("email", "name", "verified", "age")
+        }
+        or
+        {
+            "email": "", "name": "", "verified": "", "age": ""
+        }
+    )
+
+    with (require_oauth.acquire("profile group") as _the_token,
+          db.connection(current_app.config["AUTH_DB"]) as conn,
+          db.cursor(conn) as cursor):
+        _users = list_users(conn, **_kwargs)
+        _start = int(_kwargs.get("start", "0"))
+        _length = int(_kwargs.get("length", "0"))
+        cursor.execute("SELECT COUNT(*) FROM users")
+        _total_users = int(cursor.fetchone()["COUNT(*)"])
+        return jsonify({
+            "users": tuple(asdict(user) for user in
+                           (_users[_start:_start+_length]
+                            if _length else _users)),
+            "total-users": _total_users,
+            "total-filtered": len(_users)
+        })
 
 @users.route("/handle-unverified", methods=["POST"])
 def handle_unverified():
@@ -530,3 +575,165 @@ def change_password(forgot_password_token):
         flash("Both the password and its confirmation MUST be provided!",
               "alert-danger")
         return change_password_page
+
+
+def __delete_users_individually__(cursor, user_ids, tables):
+    """Recovery function with dismal performance."""
+    _errors = tuple()
+    for _user_id in user_ids:
+        for _table, _col in tables:
+            try:
+                cursor.execute(
+                        f"DELETE FROM {_table} WHERE {_col}=?",
+                        (str(_user_id),))
+            except sqlite3.IntegrityError:
+                _errors = _errors + (
+                    (("user_id", _user_id),
+                     ("reason", f"User has data in table {_table}")),)
+
+    return _errors
+
+
+def __fetch_non_deletable_users__(cursor, ids_and_reasons):
+    """Fetch detail for non-deletable users."""
+    def __merge__(acc, curr):
+        _curr = dict(curr)
+        _this_dict = acc.get(
+            curr["user_id"], {"reasons": tuple()})
+        _this_dict["reasons"] = _this_dict["reasons"] + (_curr["reason"],)
+        return {**acc, curr["user_id"]: _this_dict}
+
+    _reasons_by_id = reduce(__merge__,
+                            (dict(row) for row in ids_and_reasons),
+                            {})
+    _user_ids = tuple(_reasons_by_id.keys())
+    _paramstr = ", ".join(["?"] * len(_user_ids))
+    cursor.execute(f"SELECT * FROM users WHERE user_id IN ({_paramstr})",
+                   _user_ids)
+    return tuple({
+        "user": dict(row),
+        "reasons": _reasons_by_id[row["user_id"]]["reasons"]
+    } for row in cursor.fetchall())
+
+
+def __non_deletable_with_reason__(
+        user_ids: tuple[str, ...],
+        dbrows: Sequence[sqlite3.Row],
+        reason: str
+    ) -> tuple[tuple[tuple[str, str], tuple[str, str]], ...]:
+    """Build a list of 'non-deletable' user objects."""
+    return tuple((("user_id", _uid), ("reason", reason))
+                 for _uid in user_ids
+                 if _uid in tuple(row["user_id"] for row in dbrows))
+
+
+@users.route("/delete", methods=["POST"])
+@require_oauth("profile user role")
+def delete_users():
+    """Delete the specified user."""
+    with (require_oauth.acquire("profile") as _token,
+          db.connection(current_app.config["AUTH_DB"]) as conn,
+          db.cursor(conn) as cursor):
+        if not authorised_for2(conn,
+                               _token.user,
+                               system_resource(conn),
+                               ("system:user:delete-user",)):
+            raise AuthorisationError(
+                "You need the `system:user:delete-user` privilege to delete "
+                "users from the system.")
+
+        _form = request_json()
+        _user_ids = _form.get("user_ids", [])
+        _non_deletable = set()
+        if str(_token.user.user_id) in _user_ids:
+            _non_deletable.add(
+            (("user_id", str(_token.user.user_id),),
+             ("reason", "You are not allowed to delete yourself.")))
+
+        cursor.execute("SELECT user_id FROM group_users")
+        _group_members = tuple(row["user_id"] for row in cursor.fetchall())
+        _non_deletable.update(__non_deletable_with_reason__(
+            _user_ids,
+            cursor.fetchall(),
+            "User is member of a user group."))
+
+        cursor.execute("SELECT user_id FROM oauth2_clients;")
+        _non_deletable.update(__non_deletable_with_reason__(
+            _user_ids,
+            cursor.fetchall(),
+            "User is registered owner of an OAuth client."))
+
+        _important_roles = (
+            "group-leader",
+            "resource-owner",
+            "system-administrator",
+            "inbredset-group-owner")
+        _paramstr = ",".join(["?"] * len(_important_roles))
+        cursor.execute(
+            "SELECT DISTINCT user_roles.user_id FROM user_roles "
+            "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+            f"WHERE roles.role_name IN ({_paramstr})",
+            _important_roles)
+        _non_deletable.update(__non_deletable_with_reason__(
+            _user_ids,
+            cursor.fetchall(),
+            f"User holds on of the following roles: {_important_roles}"))
+
+        _delete = tuple(uid for uid in _user_ids if uid not in
+                        (dict(row)["user_id"] for row in _non_deletable))
+        _paramstr = ", ".join(["?"] * len(_delete))
+        if len(_delete) > 0:
+            _dependent_tables = (
+                ("authorisation_code", "user_id"),
+                ("forgot_password_tokens", "user_id"),
+                ("group_join_requests", "requester_id"),
+                ("jwt_refresh_tokens", "user_id"),
+                ("oauth2_tokens", "user_id"),
+                ("user_credentials", "user_id"),
+                ("user_roles", "user_id"),
+                ("user_verification_codes", "user_id"))
+            try:
+                for _table, _col in _dependent_tables:
+                    cursor.execute(
+                        f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})",
+                        _delete)
+            except sqlite3.IntegrityError:
+                _non_deletable.update(__delete_users_individually__(
+                    cursor, _delete, _dependent_tables))
+
+            _not_deleted = __fetch_non_deletable_users__(
+                cursor, _non_deletable)
+            _delete = tuple(# rebuild with those that failed.
+                _user_id for _user_id in _delete if _user_id not in
+                tuple(row["user"]["user_id"] for row in _not_deleted))
+            _paramstr = ", ".join(["?"] * len(_delete))
+            cursor.execute(
+                f"DELETE FROM users WHERE user_id IN ({_paramstr})",
+                _delete)
+            _deleted_rows = cursor.rowcount
+            return jsonify({
+                "total-requested": len(_user_ids),
+                "total-deleted": _deleted_rows,
+                "not-deleted": _not_deleted,
+                "deleted": _deleted_rows,
+                "message": (
+                    f"Successfully deleted {_deleted_rows} users." +
+                    (" Some users could not be deleted."
+                     if len(_user_ids) - _deleted_rows > 0
+                     else ""))
+            })
+
+        _not_deleted = __fetch_non_deletable_users__(cursor, _non_deletable)
+
+    return jsonify({
+        "total-requested": len(_user_ids),
+        "total-deleted": 0,
+        "not-deleted": _not_deleted,
+        "deleted": 0,
+        "error": "Zero users were deleted",
+        "error_description": (
+            "No users were selected for deletion."
+            if len(_user_ids) == 0
+            else ("The selected users are system administrators, group "
+                  "members, or resource owners."))
+    }), 400