"""Functions for acting on users.""" import uuid from functools import reduce from datetime import datetime, timedelta from ..roles.models import Role from ..checks import authorised_p from ..privileges import Privilege from ...db import sqlite3 as db from ...authentication.users import User def __process_age_clause__(age_desc: str) -> tuple[str, int]: """Process the age clause and parameter for 'LIST USERS' query.""" _today = datetime.now() _clause = "created" _parts = age_desc.split(" ") _multipliers = { # Temporary hack before dateutil module can make it to our deployment. "days": 1, "months": 30, "years": 365 } assert len(_parts) in (3, 4), "Invalid age descriptor!" _param = int(( _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]}) ).timestamp()) match _parts[0]: case "older": return "created < :created", _param case "younger": return "created > :created", _param case "exactly": return "created = :created", _param case _: raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised] def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]: """Process the WHERE clauses, and params for the 'LIST USERS' query.""" clauses = "" params = {} if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()): clauses = "(email LIKE :email OR name LIKE :name)" params = { "email": f'%{kwargs["email"].strip()}%', "name": f'%{kwargs["name"].strip()}%' } elif bool(kwargs.get("email", "").strip()): clauses = "email LIKE :email" params["email"] = f'%{kwargs["email"].strip()}%' elif bool(kwargs.get("name", "").strip()): clauses = "name LIKE :name" params["name"] = f'%{kwargs["name"].strip()}%' else: clauses = "" if bool(kwargs.get("verified", "").strip()): clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified" params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0" if bool(kwargs.get("age", "").strip()): _clause, _param = __process_age_clause__(kwargs["age"].strip()) clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause params["created"] = str(_param) return clauses, params @authorised_p( ("system:user:list",), "You do not have the appropriate privileges to list users.", oauth2_scope="profile user") def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]: """List out all users.""" _query = "SELECT * FROM users" _clauses, _params = __list_user_clauses_and_params__(**kwargs) if len(_clauses) > 0: _query = _query + " WHERE " + _clauses with db.cursor(conn) as cursor: cursor.execute(_query, _params) return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall()) def __build_resource_roles__(rows): def __build_roles__(roles, row): role_id = uuid.UUID(row["role_id"]) priv = Privilege(row["privilege_id"], row["privilege_description"]) role = roles.get(role_id, Role( role_id, row["role_name"], bool(row["user_editable"]), tuple())) return { **roles, role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,)) } def __build__(acc, row): resource_id = uuid.UUID(row["resource_id"]) return { **acc, resource_id: __build_roles__(acc.get(resource_id, {}), row) } return { resource_id: tuple(roles.values()) for resource_id, roles in reduce(__build__, rows, {}).items() } # @authorised_p( # ("",), # ("You do not have the appropriate privileges to view a user's roles on " # "resources.")) def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]: """Fetch all the user's roles on resources.""" with db.cursor(conn) as cursor: cursor.execute( "SELECT res.*, rls.*, p.*" "FROM resources AS res INNER JOIN " "user_roles AS ur " "ON res.resource_id=ur.resource_id " "LEFT JOIN roles AS rls " "ON ur.role_id=rls.role_id " "LEFT JOIN role_privileges AS rp " "ON rls.role_id=rp.role_id " "LEFT JOIN privileges AS p " "ON rp.privilege_id=p.privilege_id " "WHERE ur.user_id = ?", (str(user.user_id),)) return __build_resource_roles__( (dict(row) for row in cursor.fetchall()))