about summary refs log tree commit diff
diff options
context:
space:
mode:
authorClaude Sonnet 4.62026-06-02 00:00:00 +0000
committerFrederick Muriuki Muriithi2026-06-02 14:18:37 -0500
commit2691578edfc84817d9a9aafb35e3e29d04d2613e (patch)
treec2f190d2a3de81a7c5ef39dfe5e01f1955e3d89d
parent0a85369b7e423cfd1eee0ab52f2e456bd42b68b8 (diff)
downloadgn-auth-2691578edfc84817d9a9aafb35e3e29d04d2613e.tar.gz
wsgi: add create-users CLI command
Add a general-purpose `create-users` command that creates one or more
users with explicitly specified name, email, password and role.

Supported roles: system-admin (assigns default roles plus
grant_sysadmin_role), none (assigns default roles only).

Output is written as JSON to a file (with 0600 permissions) or stdout.
Helper functions __parse_user_spec__ and __write_output__ are factored
out for reuse by the forthcoming create-test-users command.
-rw-r--r--gn_auth/wsgi.py91
1 files changed, 89 insertions, 2 deletions
diff --git a/gn_auth/wsgi.py b/gn_auth/wsgi.py
index f2f17f1..4950995 100644
--- a/gn_auth/wsgi.py
+++ b/gn_auth/wsgi.py
@@ -1,4 +1,5 @@
 """Main entry point for project"""
+import os
 import sys
 import uuid
 import json
@@ -14,8 +15,11 @@ from gn_auth import create_app
 
 from gn_auth.auth.db import sqlite3 as db
 from gn_auth.auth.errors import NotFoundError
-from gn_auth.auth.authentication.users import user_by_id, hash_password
-from gn_auth.auth.authorisation.users.admin.models import make_sys_admin
+from gn_auth.auth.authentication.users import (
+    user_by_id, hash_password, save_user, set_user_password)
+from gn_auth.auth.authorisation.roles.models import assign_default_roles
+from gn_auth.auth.authorisation.users.admin.models import (
+    make_sys_admin, grant_sysadmin_role)
 from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import]
 
 
@@ -126,6 +130,89 @@ def register_admin():
     """Register the administrator."""
     rsysadm.register_admin(Path(app.config["AUTH_DB"]))
 
+
+_VALID_ROLES_ = ("system-admin", "none")
+
+
+def __parse_user_spec__(spec: str) -> dict:
+    """Parse 'key=value,key=value,...' into a dict."""
+    result = {}
+    for part in spec.split(","):
+        key, _, value = part.partition("=")
+        if key.strip():
+            result[key.strip()] = value.strip()
+    return result
+
+
+def __write_output__(data: dict, output_path) -> None:
+    """Write JSON data to a file with 0600 permissions, or stdout."""
+    text = json.dumps(data, indent=2)
+    if output_path is None:
+        print(text)
+        return
+    fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
+    with os.fdopen(fd, "w") as outfile:
+        outfile.write(text)
+
+
+@app.cli.command()
+@click.option("--user", "user_specs", multiple=True,
+              help='User spec: "name=...,email=...,password=...,role=..."')
+@click.option("--output", "output_path", type=click.Path(), default=None,
+              help="Write credentials as JSON to this file (default: stdout)")
+def create_users(user_specs, output_path):
+    """Create one or more users with specified credentials and roles.
+
+    Each --user option takes a comma-separated key=value string with the
+    following keys: name, email, password, role.
+
+    Valid roles: system-admin, none.
+    """
+    if not user_specs:
+        print("No users specified.", file=sys.stderr)
+        sys.exit(1)
+
+    records = []
+    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
+        for spec_str in user_specs:
+            spec = __parse_user_spec__(spec_str)
+            name = spec.get("name", "").strip()
+            email = spec.get("email", "").strip()
+            password = spec.get("password", "").strip()
+            role = spec.get("role", "none").strip()
+
+            if not name:
+                print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if not email:
+                print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if not password:
+                print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if role not in _VALID_ROLES_:
+                print(
+                    f"Invalid role {role!r} in spec: {spec_str!r}. "
+                    f"Valid roles: {_VALID_ROLES_}",
+                    file=sys.stderr)
+                sys.exit(1)
+
+            user = save_user(cursor, email, name, verified=True)
+            set_user_password(cursor, user, password)
+            assign_default_roles(cursor, user)
+            if role == "system-admin":
+                grant_sysadmin_role(cursor, user)
+
+            records.append({
+                "user_id": str(user.user_id),
+                "name": user.name,
+                "email": user.email,
+                "password": password,
+                "role": role,
+            })
+
+    __write_output__({"users": records}, output_path)
+
 ##### END: CLI Commands #####
 
 if __name__ == '__main__':