about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/users/models.py
blob: ab7a980b91f03271e70deab1b547d701262cb554 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Functions for acting on users."""
import uuid
import warnings
from functools import reduce
from datetime import datetime, timedelta

from ..roles.models import Role
from ..checks import authorised_p
from ..privileges import Privilege

from ...db import sqlite3 as db
from ...authentication.users import User


def __process_age_clause__(age_desc: str) -> tuple[str, int]:
    """Process the age clause and parameter for 'LIST USERS' query."""
    _today = datetime.now()
    _clause = "created"
    _parts = age_desc.split(" ")
    _multipliers = {
        # Temporary hack before dateutil module can make it to our deployment.
        "days": 1,
        "months": 30,
        "years": 365
    }
    assert len(_parts) in (3, 4), "Invalid age descriptor!"

    _param = int((
        _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
    ).timestamp())

    match _parts[0]:
        case "older":
            return "created < :created", _param
        case "younger":
            return "created > :created", _param
        case "exactly":
            return "created = :created", _param
        case _:
            raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised]


def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]:
    """Process the WHERE clauses, and params for the 'LIST USERS' query."""
    clauses = ""
    params = {}
    if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()):
        clauses = "(email LIKE :email OR name LIKE :name)"
        params = {
            "email": f'%{kwargs["email"].strip()}%',
            "name": f'%{kwargs["name"].strip()}%'
        }
    elif bool(kwargs.get("email", "").strip()):
        clauses = "email LIKE :email"
        params["email"] = f'%{kwargs["email"].strip()}%'
    elif bool(kwargs.get("name", "").strip()):
        clauses = "name LIKE :name"
        params["name"] = f'%{kwargs["name"].strip()}%'
    else:
        clauses = ""

    if bool(kwargs.get("verified", "").strip()):
        clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified"
        params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0"

    if bool(kwargs.get("age", "").strip()):
        _clause, _param = __process_age_clause__(kwargs["age"].strip())
        clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause
        params["created"] = str(_param)

    return clauses, params


@authorised_p(
    ("system:user:list",),
    "You do not have the appropriate privileges to list users.",
    oauth2_scope="profile user")
def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
    """List out all users."""
    _query = "SELECT * FROM users"
    _clauses, _params = __list_user_clauses_and_params__(**kwargs)
    if len(_clauses) > 0:
        _query = _query + " WHERE " + _clauses

    with db.cursor(conn) as cursor:
        cursor.execute(_query, _params)
        return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())

def __build_resource_roles__(rows):
    def __build_roles__(roles, row):
        role_id = uuid.UUID(row["role_id"])
        priv = Privilege(row["privilege_id"], row["privilege_description"])
        role = roles.get(role_id, Role(
            role_id, row["role_name"], bool(row["user_editable"]), tuple()))
        return {
            **roles,
            role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,))
        }
    def __build__(acc, row):
        resource_id = uuid.UUID(row["resource_id"])
        return {
            **acc,
            resource_id: __build_roles__(acc.get(resource_id, {}), row)
        }
    return {
        resource_id: tuple(roles.values())
        for resource_id, roles in reduce(__build__, rows, {}).items()
    }

# @authorised_p(
#     ("",),
#     ("You do not have the appropriate privileges to view a user's roles on "
#      "resources."))
def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]:
    """Fetch all the user's roles on resources."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT res.*, rls.*, p.*"
            "FROM resources AS res INNER JOIN "
            "user_roles AS ur "
            "ON res.resource_id=ur.resource_id "
            "LEFT JOIN roles AS rls "
            "ON ur.role_id=rls.role_id "
            "LEFT JOIN role_privileges AS rp "
            "ON rls.role_id=rp.role_id "
            "LEFT JOIN privileges AS p "
            "ON rp.privilege_id=p.privilege_id "
            "WHERE ur.user_id = ?",
            (str(user.user_id),))
        return __build_resource_roles__(
            (dict(row) for row in cursor.fetchall()))


def delete_users_by_id(
        conn: db.DbConnection,
        user_ids: tuple[uuid.UUID, ...]
) -> int:
    """Delete users unconditionally by ID, removing all dependent data.

    Unlike the HTTP endpoint, this bypasses all policy checks — users are
    deleted regardless of their roles or group memberships. Returns the
    number of users removed from the users table.
    """
    warnings.warn(
        (f"Running dangerous function `{__name__}.delete_users_by_id`. "
         "Do ensure that is what you actually want."),
        category=RuntimeWarning)
    if not user_ids:
        return 0
    _ids = tuple(str(uid) for uid in user_ids)
    _paramstr = ", ".join(["?"] * len(_ids))
    _dependent_tables = (
        ("authorisation_code", "user_id"),
        ("forgot_password_tokens", "user_id"),
        ("group_join_requests", "requester_id"),
        ("jwt_refresh_tokens", "user_id"),
        ("oauth2_tokens", "user_id"),
        ("user_credentials", "user_id"),
        ("user_roles", "user_id"),
        ("user_verification_codes", "user_id"),
    )
    with db.cursor(conn) as cursor:
        for table, col in _dependent_tables:
            cursor.execute(
                f"DELETE FROM {table} WHERE {col} IN ({_paramstr})", _ids)
        cursor.execute(
            f"DELETE FROM users WHERE user_id IN ({_paramstr})", _ids)
        return cursor.rowcount