about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/users/models.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-06-24 12:52:48 -0500
committerFrederick Muriuki Muriithi2025-06-24 12:52:48 -0500
commit8ad7019b381e618eb7d5e2bb36b4fd7ed19759a5 (patch)
treec9e409855986c99fdcf060a1f608d64c7ba5fdec /gn_auth/auth/authorisation/users/models.py
parent23d483874c0ed4f8bc031c7a38e1f70f032e542c (diff)
downloadgn-auth-8ad7019b381e618eb7d5e2bb36b4fd7ed19759a5.tar.gz
Enable filtering of data, and limiting length.
Diffstat (limited to 'gn_auth/auth/authorisation/users/models.py')
-rw-r--r--gn_auth/auth/authorisation/users/models.py60
1 files changed, 58 insertions, 2 deletions
diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py
index bde2e33..b18c5dc 100644
--- a/gn_auth/auth/authorisation/users/models.py
+++ b/gn_auth/auth/authorisation/users/models.py
@@ -1,6 +1,8 @@
 """Functions for acting on users."""
 import uuid
+from typing import Union
 from functools import reduce
+from datetime import datetime, timedelta
 
 from ..roles.models import Role
 from ..checks import authorised_p
@@ -9,14 +11,68 @@ from ..privileges import Privilege
 from ...db import sqlite3 as db
 from ...authentication.users import User
 
+
+def __process_age_clause__(age_desc: str) -> tuple[str, int]:
+    """Process the age clause and parameter for 'LIST USERS' query."""
+    _today = datetime.now()
+    _clause = "created"
+    _parts = age_desc.split(" ")
+    _multipliers = {
+        # Temporary hack before dateutil module can make it to our deployment.
+        "days": 1,
+        "months": 30,
+        "years": 365
+    }
+    assert len(_parts) in (3, 4), "Invalid age descriptor!"
+
+    _param = int((
+        _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
+    ).timestamp())
+
+    match _parts[0]:
+        case "older":
+            return "created < :created", _param
+        case "younger":
+            return "created > :created", _param
+        case "exactly":
+            return "created = :created", _param
+        case _:
+            raise Exception("Invalid age descriptor.")
+
+
+def __list_user_clauses_and_params__(**kwargs) -> tuple[list[str], dict[str, Union[int, str]]]:
+    """Process the WHERE clauses, and params for the 'LIST USERS' query."""
+    clauses = []
+    params = {}
+    if bool(kwargs.get("email", "").strip()):
+        clauses = clauses + ["email LIKE :email"]
+        params["email"] = kwargs["email"].strip()
+
+    if bool(kwargs.get("verified", "").strip()):
+        clauses = clauses + ["verified=:verified"]
+        params["verified"] = 1 if kwargs["verified"].strip() == "yes" else "no"
+
+    if bool(kwargs.get("age", "").strip()):
+        _clause, _param = __process_age_clause__(kwargs["age"].strip())
+        clauses = clauses + [_clause]
+        params["created"] = _param
+
+    return clauses, params
+
+
 @authorised_p(
     ("system:user:list",),
     "You do not have the appropriate privileges to list users.",
     oauth2_scope="profile user")
-def list_users(conn: db.DbConnection) -> tuple[User, ...]:
+def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
     """List out all users."""
+    _query = "SELECT * FROM users"
+    _clauses, _params = __list_user_clauses_and_params__(**kwargs)
+    if len(_clauses) > 0:
+        _query = _query + " WHERE " + " AND ".join(_clauses)
+
     with db.cursor(conn) as cursor:
-        cursor.execute("SELECT * FROM users")
+        cursor.execute(_query, _params)
         return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())
 
 def __build_resource_roles__(rows):