about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/users/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/users/models.py')
-rw-r--r--gn_auth/auth/authorisation/users/models.py70
1 files changed, 68 insertions, 2 deletions
diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py
index bde2e33..d30bfd0 100644
--- a/gn_auth/auth/authorisation/users/models.py
+++ b/gn_auth/auth/authorisation/users/models.py
@@ -1,6 +1,7 @@
 """Functions for acting on users."""
 import uuid
 from functools import reduce
+from datetime import datetime, timedelta
 
 from ..roles.models import Role
 from ..checks import authorised_p
@@ -9,14 +10,79 @@ from ..privileges import Privilege
 from ...db import sqlite3 as db
 from ...authentication.users import User
 
+
+def __process_age_clause__(age_desc: str) -> tuple[str, int]:
+    """Process the age clause and parameter for 'LIST USERS' query."""
+    _today = datetime.now()
+    _clause = "created"
+    _parts = age_desc.split(" ")
+    _multipliers = {
+        # Temporary hack before dateutil module can make it to our deployment.
+        "days": 1,
+        "months": 30,
+        "years": 365
+    }
+    assert len(_parts) in (3, 4), "Invalid age descriptor!"
+
+    _param = int((
+        _today - timedelta(**{"days": int(_parts[-2]) * _multipliers[_parts[-1]]})
+    ).timestamp())
+
+    match _parts[0]:
+        case "older":
+            return "created < :created", _param
+        case "younger":
+            return "created > :created", _param
+        case "exactly":
+            return "created = :created", _param
+        case _:
+            raise Exception("Invalid age descriptor.")# pylint: disable=[broad-exception-raised]
+
+
+def __list_user_clauses_and_params__(**kwargs) -> tuple[str, dict[str, str]]:
+    """Process the WHERE clauses, and params for the 'LIST USERS' query."""
+    clauses = ""
+    params = {}
+    if bool(kwargs.get("email", "").strip()) and bool(kwargs.get("name", "").strip()):
+        clauses = "(email LIKE :email OR name LIKE :name)"
+        params = {
+            "email": f'%{kwargs["email"].strip()}%',
+            "name": f'%{kwargs["name"].strip()}%'
+        }
+    elif bool(kwargs.get("email", "").strip()):
+        clauses = "email LIKE :email"
+        params["email"] = f'%{kwargs["email"].strip()}%'
+    elif bool(kwargs.get("name", "").strip()):
+        clauses = "name LIKE :name"
+        params["name"] = f'%{kwargs["name"].strip()}%'
+    else:
+        clauses = ""
+
+    if bool(kwargs.get("verified", "").strip()):
+        clauses = clauses + (" AND " if len(clauses) > 0 else "") + "verified=:verified"
+        params["verified"] = "1" if kwargs["verified"].strip() == "yes" else "0"
+
+    if bool(kwargs.get("age", "").strip()):
+        _clause, _param = __process_age_clause__(kwargs["age"].strip())
+        clauses = clauses + (" AND " if len(clauses) > 0 else "") + _clause
+        params["created"] = str(_param)
+
+    return clauses, params
+
+
 @authorised_p(
     ("system:user:list",),
     "You do not have the appropriate privileges to list users.",
     oauth2_scope="profile user")
-def list_users(conn: db.DbConnection) -> tuple[User, ...]:
+def list_users(conn: db.DbConnection, **kwargs) -> tuple[User, ...]:
     """List out all users."""
+    _query = "SELECT * FROM users"
+    _clauses, _params = __list_user_clauses_and_params__(**kwargs)
+    if len(_clauses) > 0:
+        _query = _query + " WHERE " + _clauses
+
     with db.cursor(conn) as cursor:
-        cursor.execute("SELECT * FROM users")
+        cursor.execute(_query, _params)
         return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall())
 
 def __build_resource_roles__(rows):