aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/users
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/users')
-rw-r--r--gn_auth/auth/authorisation/users/collections/models.py10
-rw-r--r--gn_auth/auth/authorisation/users/models.py70
-rw-r--r--gn_auth/auth/authorisation/users/views.py202
3 files changed, 269 insertions, 13 deletions
diff --git a/gn_auth/auth/authorisation/users/collections/models.py b/gn_auth/auth/authorisation/users/collections/models.py
index f0a7fa2..63443ef 100644
--- a/gn_auth/auth/authorisation/users/collections/models.py
+++ b/gn_auth/auth/authorisation/users/collections/models.py
@@ -33,7 +33,7 @@ def __valid_email__(email:str) -> bool:
def __toggle_boolean_field__(
rconn: Redis, email: str, field: str):
"""Toggle the valuen of a boolean field"""
- mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}")
+ mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}") # type: ignore
if bool(mig_dict):
rconn.hset("migratable-accounts", email,
json.dumps({**mig_dict, field: not mig_dict.get(field, True)}))
@@ -52,7 +52,7 @@ def __build_email_uuid_bridge__(rconn: Redis):
"resources_migrated": False
} for account in (
acct for acct in
- (json.loads(usr) for usr in rconn.hgetall("users").values())
+ (json.loads(usr) for usr in rconn.hgetall("users").values()) # type: ignore
if (bool(acct.get("email_address", False)) and
__valid_email__(acct["email_address"])))
}
@@ -66,7 +66,7 @@ def __retrieve_old_accounts__(rconn: Redis) -> dict:
accounts = rconn.hgetall("migratable-accounts")
if accounts:
return {
- key: json.loads(value) for key, value in accounts.items()
+ key: json.loads(value) for key, value in accounts.items() # type: ignore
}
return __build_email_uuid_bridge__(rconn)
@@ -91,13 +91,13 @@ def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple:
"""Retrieve any old collections relating to the user."""
return tuple(parse_collection(coll) for coll in
json.loads(rconn.hget(
- __OLD_REDIS_COLLECTIONS_KEY__, str(old_user_id)) or "[]"))
+ __OLD_REDIS_COLLECTIONS_KEY__, str(old_user_id)) or "[]")) # type: ignore
def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]:
"""Retrieve current user collections."""
collections = tuple(parse_collection(coll) for coll in json.loads(
rconn.hget(REDIS_COLLECTIONS_KEY, str(user.user_id)) or
- "[]"))
+ "[]")) # type: ignore
old_accounts = __retrieve_old_accounts__(rconn)
if (user.email in old_accounts and
not old_accounts[user.email]["collections-migrated"]):
diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py
index bde2e33..d30bfd0 100644
--- a/gn_auth/auth/authorisation/users/models.py
+++ b/gn_auth/auth/authorisation/users/models.py
@@ -1,6 +1,7 @@
"""Functions for acting on users."""
import uuid
from functools import reduce
+from datetime import datetime, timedelta
from ..roles.models import Role
from ..checks import authorised_p
@@ -9,14 +10,79 @@ from ..privileges import Privilege
from ...db import sqlite3 as db
from ...authentication.users import User
+
+def __process_age_clause__(age_desc: str) -> tuple[str, int]:
+ """Process the age clause and parameter for 'LIST USERS' query."""
+ _today = datetime.now()
+ _clause = "created"
+ _parts = age_desc.split(" ")
+ _multipliers = {
+ # Temporary hack before dateutil module can make it to our deployment.
+ "days": 1,
+ "months": 30,
+ "years": 365
+ }
+ assert len(_parts) in (3, 4), "Invalid age descriptor!"
+
+ _param = int((
+ _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
+ ).timestamp())
+
+ match _parts[0]:
+ case "older":
+ return "created < :created", _param
+ case "younger":
+ return "created > :created", _param
+ case "exactly":
+ return "created = :created", _param
+ case _:
+ raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised]
+
+
+def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]:
+ """Process the WHERE clauses, and params for the 'LIST USERS' query."""
+ clauses = ""
+ params = {}
+ if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()):
+ clauses = "(email LIKE :email OR name LIKE :name)"
+ params = {
+ "email": f'%{kwargs["email"].strip()}%',
+ "name": f'%{kwargs["name"].strip()}%'
+ }
+ elif bool(kwargs.get("email", "").strip()):
+ clauses = "email LIKE :email"
+ params["email"] = f'%{kwargs["email"].strip()}%'
+ elif bool(kwargs.get("name", "").strip()):
+ clauses = "name LIKE :name"
+ params["name"] = f'%{kwargs["name"].strip()}%'
+ else:
+ clauses = ""
+
+ if bool(kwargs.get("verified", "").strip()):
+ clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified"
+ params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0"
+
+ if bool(kwargs.get("age", "").strip()):
+ _clause, _param = __process_age_clause__(kwargs["age"].strip())
+ clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause
+ params["created"] = str(_param)
+
+ return clauses, params
+
+
@authorised_p(
("system:user:list",),
"You do not have the appropriate privileges to list users.",
oauth2_scope="profile user")
-def list_users(conn: db.DbConnection) -> tuple[User, ...]:
+def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
"""List out all users."""
+ _query = "SELECT * FROM users"
+ _clauses, _params = __list_user_clauses_and_params__(**kwargs)
+ if len(_clauses) > 0:
+ _query = _query + " WHERE " + _clauses
+
with db.cursor(conn) as cursor:
- cursor.execute("SELECT * FROM users")
+ cursor.execute(_query, _params)
return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())
def __build_resource_roles__(rows):
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py
index f8ccdbe..91e459d 100644
--- a/gn_auth/auth/authorisation/users/views.py
+++ b/gn_auth/auth/authorisation/users/views.py
@@ -3,10 +3,10 @@ import uuid
import sqlite3
import secrets
import traceback
-from typing import Any
-from functools import partial
from dataclasses import asdict
+from typing import Any, Sequence
from urllib.parse import urljoin
+from functools import reduce, partial
from datetime import datetime, timedelta
from email.headerregistry import Address
from email_validator import validate_email, EmailNotValidError
@@ -28,6 +28,9 @@ from gn_auth.auth.requests import request_json
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.db.sqlite3 import with_db_connection
+from gn_auth.auth.authorisation.resources.system.models import system_resource
+
+from gn_auth.auth.authorisation.resources.checks import authorised_for2
from gn_auth.auth.authorisation.resources.models import (
user_resources as _user_resources)
from gn_auth.auth.authorisation.roles.models import (
@@ -39,6 +42,7 @@ from gn_auth.auth.errors import (
NotFoundError,
UsernameError,
PasswordError,
+ AuthorisationError,
UserRegistrationError)
@@ -205,7 +209,7 @@ def register_user() -> Response:
with db.cursor(conn) as cursor:
user, _hashed_password = set_user_password(
cursor, save_user(
- cursor, email["email"], user_name), password)
+ cursor, email["email"], user_name), password) # type: ignore
assign_default_roles(cursor, user)
send_verification_email(conn,
user,
@@ -331,9 +335,33 @@ def user_join_request_exists():
@require_oauth("profile user")
def list_all_users() -> Response:
"""List all the users."""
- with require_oauth.acquire("profile group") as _the_token:
- return jsonify(tuple(
- asdict(user) for user in with_db_connection(list_users)))
+ _kwargs = (
+ {
+ key: value
+ for key, value in request_json().items()
+ if key in ("email", "name", "verified", "age")
+ }
+ or
+ {
+ "email": "", "name": "", "verified": "", "age": ""
+ }
+ )
+
+ with (require_oauth.acquire("profile group") as _the_token,
+ db.connection(current_app.config["AUTH_DB"]) as conn,
+ db.cursor(conn) as cursor):
+ _users = list_users(conn, **_kwargs)
+ _start = int(_kwargs.get("start", "0"))
+ _length = int(_kwargs.get("length", "0"))
+ cursor.execute("SELECT COUNT(*) FROM users")
+ _total_users = int(cursor.fetchone()["COUNT(*)"])
+ return jsonify({
+ "users": tuple(asdict(user) for user in
+ (_users[_start:_start+_length]
+ if _length else _users)),
+ "total-users": _total_users,
+ "total-filtered": len(_users)
+ })
@users.route("/handle-unverified", methods=["POST"])
def handle_unverified():
@@ -530,3 +558,165 @@ def change_password(forgot_password_token):
flash("Both the password and its confirmation MUST be provided!",
"alert-danger")
return change_password_page
+
+
+def __delete_users_individually__(cursor, user_ids, tables):
+ """Recovery function with dismal performance."""
+ _errors = tuple()
+ for _user_id in user_ids:
+ for _table, _col in tables:
+ try:
+ cursor.execute(
+ f"DELETE FROM {_table} WHERE {_col}=?",
+ (str(_user_id),))
+ except sqlite3.IntegrityError:
+ _errors = _errors + (
+ (("user_id", _user_id),
+ ("reason", f"User has data in table {_table}")),)
+
+ return _errors
+
+
+def __fetch_non_deletable_users__(cursor, ids_and_reasons):
+ """Fetch detail for non-deletable users."""
+ def __merge__(acc, curr):
+ _curr = dict(curr)
+ _this_dict = acc.get(
+ curr["user_id"], {"reasons": tuple()})
+ _this_dict["reasons"] = _this_dict["reasons"] + (_curr["reason"],)
+ return {**acc, curr["user_id"]: _this_dict}
+
+ _reasons_by_id = reduce(__merge__,
+ (dict(row) for row in ids_and_reasons),
+ {})
+ _user_ids = tuple(_reasons_by_id.keys())
+ _paramstr = ", ".join(["?"] * len(_user_ids))
+ cursor.execute(f"SELECT * FROM users WHERE user_id IN ({_paramstr})",
+ _user_ids)
+ return tuple({
+ "user": dict(row),
+ "reasons": _reasons_by_id[row["user_id"]]["reasons"]
+ } for row in cursor.fetchall())
+
+
+def __non_deletable_with_reason__(
+ user_ids: tuple[str, ...],
+ dbrows: Sequence[sqlite3.Row],
+ reason: str
+ ) -> tuple[tuple[tuple[str, str], tuple[str, str]], ...]:
+ """Build a list of 'non-deletable' user objects."""
+ return tuple((("user_id", _uid), ("reason", reason))
+ for _uid in user_ids
+ if _uid in tuple(row["user_id"] for row in dbrows))
+
+
+@users.route("/delete", methods=["POST"])
+@require_oauth("profile user role")
+def delete_users():
+ """Delete the specified user."""
+ with (require_oauth.acquire("profile") as _token,
+ db.connection(current_app.config["AUTH_DB"]) as conn,
+ db.cursor(conn) as cursor):
+ if not authorised_for2(conn,
+ _token.user,
+ system_resource(conn),
+ ("system:user:delete-user",)):
+ raise AuthorisationError(
+ "You need the `system:user:delete-user` privilege to delete "
+ "users from the system.")
+
+ _form = request_json()
+ _user_ids = _form.get("user_ids", [])
+ _non_deletable = set()
+ if str(_token.user.user_id) in _user_ids:
+ _non_deletable.add(
+ (("user_id", str(_token.user.user_id),),
+ ("reason", "You are not allowed to delete yourself.")))
+
+ cursor.execute("SELECT user_id FROM group_users")
+ _group_members = tuple(row["user_id"] for row in cursor.fetchall())
+ _non_deletable.update(__non_deletable_with_reason__(
+ _user_ids,
+ cursor.fetchall(),
+ "User is member of a user group."))
+
+ cursor.execute("SELECT user_id FROM oauth2_clients;")
+ _non_deletable.update(__non_deletable_with_reason__(
+ _user_ids,
+ cursor.fetchall(),
+ "User is registered owner of an OAuth client."))
+
+ _important_roles = (
+ "group-leader",
+ "resource-owner",
+ "system-administrator",
+ "inbredset-group-owner")
+ _paramstr = ",".join(["?"] * len(_important_roles))
+ cursor.execute(
+ "SELECT DISTINCT user_roles.user_id FROM user_roles "
+ "INNER JOIN roles ON user_roles.role_id=roles.role_id "
+ f"WHERE roles.role_name IN ({_paramstr})",
+ _important_roles)
+ _non_deletable.update(__non_deletable_with_reason__(
+ _user_ids,
+ cursor.fetchall(),
+ f"User holds on of the following roles: {_important_roles}"))
+
+ _delete = tuple(uid for uid in _user_ids if uid not in
+ (dict(row)["user_id"] for row in _non_deletable))
+ _paramstr = ", ".join(["?"] * len(_delete))
+ if len(_delete) > 0:
+ _dependent_tables = (
+ ("authorisation_code", "user_id"),
+ ("forgot_password_tokens", "user_id"),
+ ("group_join_requests", "requester_id"),
+ ("jwt_refresh_tokens", "user_id"),
+ ("oauth2_tokens", "user_id"),
+ ("user_credentials", "user_id"),
+ ("user_roles", "user_id"),
+ ("user_verification_codes", "user_id"))
+ try:
+ for _table, _col in _dependent_tables:
+ cursor.execute(
+ f"DELETE FROM {_table} WHERE {_col} IN ({_paramstr})",
+ _delete)
+ except sqlite3.IntegrityError:
+ _non_deletable.update(__delete_users_individually__(
+ cursor, _delete, _dependent_tables))
+
+ _not_deleted = __fetch_non_deletable_users__(
+ cursor, _non_deletable)
+ _delete = tuple(# rebuild with those that failed.
+ _user_id for _user_id in _delete if _user_id not in
+ tuple(row["user"]["user_id"] for row in _not_deleted))
+ _paramstr = ", ".join(["?"] * len(_delete))
+ cursor.execute(
+ f"DELETE FROM users WHERE user_id IN ({_paramstr})",
+ _delete)
+ _deleted_rows = cursor.rowcount
+ return jsonify({
+ "total-requested": len(_user_ids),
+ "total-deleted": _deleted_rows,
+ "not-deleted": _not_deleted,
+ "deleted": _deleted_rows,
+ "message": (
+ f"Successfully deleted {_deleted_rows} users." +
+ (" Some users could not be deleted."
+ if len(_user_ids) - _deleted_rows > 0
+ else ""))
+ })
+
+ _not_deleted = __fetch_non_deletable_users__(cursor, _non_deletable)
+
+ return jsonify({
+ "total-requested": len(_user_ids),
+ "total-deleted": 0,
+ "not-deleted": _not_deleted,
+ "deleted": 0,
+ "error": "Zero users were deleted",
+ "error_description": (
+ "No users were selected for deletion."
+ if len(_user_ids) == 0
+ else ("The selected users are system administrators, group "
+ "members, or resource owners."))
+ }), 400