aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/users/models.py
blob: b18c5dc94d7f079d8bd3c8537a41739b640eb570 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Functions for acting on users."""
import uuid
from typing import Union
from functools import reduce
from datetime import datetime, timedelta

from ..roles.models import Role
from ..checks import authorised_p
from ..privileges import Privilege

from ...db import sqlite3 as db
from ...authentication.users import User


def __process_age_clause__(age_desc: str) -> tuple[str, int]:
    """Process the age clause and parameter for 'LIST USERS' query."""
    _today = datetime.now()
    _clause = "created"
    _parts = age_desc.split(" ")
    _multipliers = {
        # Temporary hack before dateutil module can make it to our deployment.
        "days": 1,
        "months": 30,
        "years": 365
    }
    assert len(_parts) in (3, 4), "Invalid age descriptor!"

    _param = int((
        _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
    ).timestamp())

    match _parts[0]:
        case "older":
            return "created < :created", _param
        case "younger":
            return "created > :created", _param
        case "exactly":
            return "created = :created", _param
        case _:
            raise Exception("Invalid age descriptor.")


def __list_user_clauses_and_params__(**kwargs) -> tuple[list[str], dict[str, Union[int, str]]]:
    """Process the WHERE clauses, and params for the 'LIST USERS' query."""
    clauses = []
    params = {}
    if bool(kwargs.get("email", "").strip()):
        clauses = clauses + ["email LIKE :email"]
        params["email"] = kwargs["email"].strip()

    if bool(kwargs.get("verified", "").strip()):
        clauses = clauses + ["verified=:verified"]
        params["verified"] = 1 if kwargs["verified"].strip() == "yes" else "no"

    if bool(kwargs.get("age", "").strip()):
        _clause, _param = __process_age_clause__(kwargs["age"].strip())
        clauses = clauses + [_clause]
        params["created"] = _param

    return clauses, params


@authorised_p(
    ("system:user:list",),
    "You do not have the appropriate privileges to list users.",
    oauth2_scope="profile user")
def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
    """List out all users."""
    _query = "SELECT * FROM users"
    _clauses, _params = __list_user_clauses_and_params__(**kwargs)
    if len(_clauses) > 0:
        _query = _query + " WHERE " + " AND ".join(_clauses)

    with db.cursor(conn) as cursor:
        cursor.execute(_query, _params)
        return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())

def __build_resource_roles__(rows):
    def __build_roles__(roles, row):
        role_id = uuid.UUID(row["role_id"])
        priv = Privilege(row["privilege_id"], row["privilege_description"])
        role = roles.get(role_id, Role(
            role_id, row["role_name"], bool(row["user_editable"]), tuple()))
        return {
            **roles,
            role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,))
        }
    def __build__(acc, row):
        resource_id = uuid.UUID(row["resource_id"])
        return {
            **acc,
            resource_id: __build_roles__(acc.get(resource_id, {}), row)
        }
    return {
        resource_id: tuple(roles.values())
        for resource_id, roles in reduce(__build__, rows, {}).items()
    }

# @authorised_p(
#     ("",),
#     ("You do not have the appropriate privileges to view a user's roles on "
#      "resources."))
def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]:
    """Fetch all the user's roles on resources."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT res.*, rls.*, p.*"
            "FROM resources AS res INNER JOIN "
            "user_roles AS ur "
            "ON res.resource_id=ur.resource_id "
            "LEFT JOIN roles AS rls "
            "ON ur.role_id=rls.role_id "
            "LEFT JOIN role_privileges AS rp "
            "ON rls.role_id=rp.role_id "
            "LEFT JOIN privileges AS p "
            "ON rp.privilege_id=p.privilege_id "
            "WHERE ur.user_id = ?",
            (str(user.user_id),))
        return __build_resource_roles__(
            (dict(row) for row in cursor.fetchall()))