aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/users
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/users')
-rw-r--r--gn_auth/auth/authorisation/users/models.py34
-rw-r--r--gn_auth/auth/authorisation/users/views.py90
2 files changed, 109 insertions, 15 deletions
diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py
index ef3ce7f..d30bfd0 100644
--- a/gn_auth/auth/authorisation/users/models.py
+++ b/gn_auth/auth/authorisation/users/models.py
@@ -1,6 +1,5 @@
"""Functions for acting on users."""
import uuid
-from typing import Union
from functools import reduce
from datetime import datetime, timedelta
@@ -37,29 +36,36 @@ def __process_age_clause__(age_desc: str) -> tuple[str, int]:
case "exactly":
return "created = :created", _param
case _:
- raise Exception("Invalid age descriptor.")
+ raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised]
-def __list_user_clauses_and_params__(**kwargs) -> tuple[list[str], dict[str, Union[int, str]]]:
+def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]:
"""Process the WHERE clauses, and params for the 'LIST USERS' query."""
- clauses = []
+ clauses = ""
params = {}
- if bool(kwargs.get("email", "").strip()):
- clauses = clauses + ["email LIKE :email"]
+ if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()):
+ clauses = "(email LIKE :email OR name LIKE :name)"
+ params = {
+ "email": f'%{kwargs["email"].strip()}%',
+ "name": f'%{kwargs["name"].strip()}%'
+ }
+ elif bool(kwargs.get("email", "").strip()):
+ clauses = "email LIKE :email"
params["email"] = f'%{kwargs["email"].strip()}%'
-
- if bool(kwargs.get("name", "").strip()):
- clauses = clauses + ["name LIKE :name"]
+ elif bool(kwargs.get("name", "").strip()):
+ clauses = "name LIKE :name"
params["name"] = f'%{kwargs["name"].strip()}%'
+ else:
+ clauses = ""
if bool(kwargs.get("verified", "").strip()):
- clauses = clauses + ["verified=:verified"]
- params["verified"] = 1 if kwargs["verified"].strip() == "yes" else "no"
+ clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified"
+ params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0"
if bool(kwargs.get("age", "").strip()):
_clause, _param = __process_age_clause__(kwargs["age"].strip())
- clauses = clauses + [_clause]
- params["created"] = _param
+ clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause
+ params["created"] = str(_param)
return clauses, params
@@ -73,7 +79,7 @@ def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
_query = "SELECT * FROM users"
_clauses, _params = __list_user_clauses_and_params__(**kwargs)
if len(_clauses) > 0:
- _query = _query + " WHERE " + " AND ".join(_clauses)
+ _query = _query + " WHERE " + _clauses
with db.cursor(conn) as cursor:
cursor.execute(_query, _params)
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py
index 5140bcb..55921ca 100644
--- a/gn_auth/auth/authorisation/users/views.py
+++ b/gn_auth/auth/authorisation/users/views.py
@@ -28,6 +28,9 @@ from gn_auth.auth.requests import request_json
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.db.sqlite3 import with_db_connection
+from gn_auth.auth.authorisation.resources.system.models import system_resource
+
+from gn_auth.auth.authorisation.resources.checks import authorised_for2
from gn_auth.auth.authorisation.resources.models import (
user_resources as _user_resources)
from gn_auth.auth.authorisation.roles.models import (
@@ -39,6 +42,7 @@ from gn_auth.auth.errors import (
NotFoundError,
UsernameError,
PasswordError,
+ AuthorisationError,
UserRegistrationError)
@@ -333,7 +337,11 @@ def list_all_users() -> Response:
"""List all the users."""
_kwargs = {
key: value
- for key, value in request.json.items()
+ for key, value in (
+ request.json.items() if request.json
+ else {
+ "email": "", "name": "", "verified": "", "age": ""
+ })
if key in ("email", "name", "verified", "age")
}
@@ -548,3 +556,83 @@ def change_password(forgot_password_token):
flash("Both the password and its confirmation MUST be provided!",
"alert-danger")
return change_password_page
+
+
+@users.route("/delete", methods=["POST"])
+@require_oauth("profile user role")
+def delete_users():
+ """Delete the specified user."""
+ with (require_oauth.acquire("profile") as _token,
+ db.connection(current_app.config["AUTH_DB"]) as conn,
+ db.cursor(conn) as cursor):
+ if not authorised_for2(conn,
+ _token.user,
+ system_resource(conn),
+ ("system:user:delete-user",)):
+ raise AuthorisationError(
+ "You need the `system:user:delete-user` privilege to delete "
+ "users from the system.")
+
+ _form = request_json()
+ _user_ids = _form.get("user_ids", [])
+ _non_deletable = set((str(_token.user.user_id),))
+
+ cursor.execute("SELECT user_id FROM group_users")
+ _non_deletable.update(row["user_id"] for row in cursor.fetchall())
+
+ cursor.execute("SELECT user_id FROM oauth2_clients;")
+ _non_deletable.update(row["user_id"] for row in cursor.fetchall())
+
+ _important_roles = (
+ "group-leader",
+ "resource-owner",
+ "system-administrator",
+ "inbredset-group-owner")
+ _paramstr = ",".join(["?"] * len(_important_roles))
+ cursor.execute(
+ "SELECT DISTINCT user_roles.user_id FROM user_roles "
+ "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+ f"WHERE roles.role_name IN ({_paramstr})",
+ _important_roles)
+ _non_deletable.update(row["user_id"] for row in cursor.fetchall())
+
+ _delete = tuple(uid for uid in _user_ids if uid not in _non_deletable)
+ _paramstr = ", ".join(["?"] * len(_delete))
+ if len(_delete) > 0:
+ _dependent_tables = (
+ ("authorisation_code", "user_id"),
+ ("forgot_password_tokens", "user_id"),
+ ("group_join_requests", "requester_id"),
+ ("jwt_refresh_tokens", "user_id"),
+ ("oauth2_tokens", "user_id"),
+ ("user_credentials", "user_id"),
+ ("user_roles", "user_id"),
+ ("user_verification_codes", "user_id"))
+ for _table, _col in _dependent_tables:
+ cursor.execute(
+ f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})",
+ _delete)
+
+ cursor.execute(
+ f"DELETE FROM users WHERE user_id IN ({_paramstr})",
+ _delete)
+ _deleted_rows = cursor.rowcount
+ _diff = len(_user_ids) - _deleted_rows
+ return jsonify({
+ "total-requested": len(_user_ids),
+ "total-deleted": _deleted_rows,
+ "not-deleted": _diff,
+ "message": (
+ f"Successfully deleted {_deleted_rows} users." +
+ (" Some users could not be deleted." if _diff > 0 else ""))
+ })
+
+ return jsonify({
+ "total-requested": len(_user_ids),
+ "total-deleted": 0,
+ "not-deleted": len(_user_ids),
+ "error": "Zero users were deleted",
+ "error_description": (
+ "Either no users were selected or all the selected users are "
+ "system administrators, group members, or resource owners.")
+ }), 400