aboutsummaryrefslogtreecommitdiff
"""Functions for acting on users."""
import uuid
from functools import reduce
from datetime import datetime, timedelta

from ..roles.models import Role
from ..checks import authorised_p
from ..privileges import Privilege

from ...db import sqlite3 as db
from ...authentication.users import User


def __process_age_clause__(age_desc: str) -> tuple[str, int]:
    """Process the age clause and parameter for 'LIST USERS' query."""
    _today = datetime.now()
    _clause = "created"
    _parts = age_desc.split(" ")
    _multipliers = {
        # Temporary hack before dateutil module can make it to our deployment.
        "days": 1,
        "months": 30,
        "years": 365
    }
    assert len(_parts) in (3, 4), "Invalid age descriptor!"

    _param = int((
        _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
    ).timestamp())

    match _parts[0]:
        case "older":
            return "created < :created", _param
        case "younger":
            return "created > :created", _param
        case "exactly":
            return "created = :created", _param
        case _:
            raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised]


def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]:
    """Process the WHERE clauses, and params for the 'LIST USERS' query."""
    clauses = ""
    params = {}
    if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()):
        clauses = "(email LIKE :email OR name LIKE :name)"
        params = {
            "email": f'%{kwargs["email"].strip()}%',
            "name": f'%{kwargs["name"].strip()}%'
        }
    elif bool(kwargs.get("email", "").strip()):
        clauses = "email LIKE :email"
        params["email"] = f'%{kwargs["email"].strip()}%'
    elif bool(kwargs.get("name", "").strip()):
        clauses = "name LIKE :name"
        params["name"] = f'%{kwargs["name"].strip()}%'
    else:
        clauses = ""

    if bool(kwargs.get("verified", "").strip()):
        clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified"
        params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0"

    if bool(kwargs.get("age", "").strip()):
        _clause, _param = __process_age_clause__(kwargs["age"].strip())
        clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause
        params["created"] = str(_param)

    return clauses, params


@authorised_p(
    ("system:user:list",),
    "You do not have the appropriate privileges to list users.",
    oauth2_scope="profile user")
def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
    """List out all users."""
    _query = "SELECT * FROM users"
    _clauses, _params = __list_user_clauses_and_params__(**kwargs)
    if len(_clauses) > 0:
        _query = _query + " WHERE " + _clauses

    with db.cursor(conn) as cursor:
        cursor.execute(_query, _params)
        return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())

def __build_resource_roles__(rows):
    def __build_roles__(roles, row):
        role_id = uuid.UUID(row["role_id"])
        priv = Privilege(row["privilege_id"], row["privilege_description"])
        role = roles.get(role_id, Role(
            role_id, row["role_name"], bool(row["user_editable"]), tuple()))
        return {
            **roles,
            role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,))
        }
    def __build__(acc, row):
        resource_id = uuid.UUID(row["resource_id"])
        return {
            **acc,
            resource_id: __build_roles__(acc.get(resource_id, {}), row)
        }
    return {
        resource_id: tuple(roles.values())
        for resource_id, roles in reduce(__build__, rows, {}).items()
    }

# @authorised_p(
#     ("",),
#     ("You do not have the appropriate privileges to view a user's roles on "
#      "resources."))
def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]:
    """Fetch all the user's roles on resources."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT res.*, rls.*, p.*"
            "FROM resources AS res INNER JOIN "
            "user_roles AS ur "
            "ON res.resource_id=ur.resource_id "
            "LEFT JOIN roles AS rls "
            "ON ur.role_id=rls.role_id "
            "LEFT JOIN role_privileges AS rp "
            "ON rls.role_id=rp.role_id "
            "LEFT JOIN privileges AS p "
            "ON rp.privilege_id=p.privilege_id "
            "WHERE ur.user_id = ?",
            (str(user.user_id),))
        return __build_resource_roles__(
            (dict(row) for row in cursor.fetchall()))