diff options
| -rw-r--r-- | gn_auth/wsgi.py | 91 |
1 files changed, 89 insertions, 2 deletions
diff --git a/gn_auth/wsgi.py b/gn_auth/wsgi.py index f2f17f1..4950995 100644 --- a/gn_auth/wsgi.py +++ b/gn_auth/wsgi.py @@ -1,4 +1,5 @@ """Main entry point for project""" +import os import sys import uuid import json @@ -14,8 +15,11 @@ from gn_auth import create_app from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.errors import NotFoundError -from gn_auth.auth.authentication.users import user_by_id, hash_password -from gn_auth.auth.authorisation.users.admin.models import make_sys_admin +from gn_auth.auth.authentication.users import ( + user_by_id, hash_password, save_user, set_user_password) +from gn_auth.auth.authorisation.roles.models import assign_default_roles +from gn_auth.auth.authorisation.users.admin.models import ( + make_sys_admin, grant_sysadmin_role) from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import] @@ -126,6 +130,89 @@ def register_admin(): """Register the administrator.""" rsysadm.register_admin(Path(app.config["AUTH_DB"])) + +_VALID_ROLES_ = ("system-admin", "none") + + +def __parse_user_spec__(spec: str) -> dict: + """Parse 'key=value,key=value,...' into a dict.""" + result = {} + for part in spec.split(","): + key, _, value = part.partition("=") + if key.strip(): + result[key.strip()] = value.strip() + return result + + +def __write_output__(data: dict, output_path) -> None: + """Write JSON data to a file with 0600 permissions, or stdout.""" + text = json.dumps(data, indent=2) + if output_path is None: + print(text) + return + fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(fd, "w") as outfile: + outfile.write(text) + + +@app.cli.command() +@click.option("--user", "user_specs", multiple=True, + help='User spec: "name=...,email=...,password=...,role=..."') +@click.option("--output", "output_path", type=click.Path(), default=None, + help="Write credentials as JSON to this file (default: stdout)") +def create_users(user_specs, output_path): + """Create one or more users with specified credentials and roles. + + Each --user option takes a comma-separated key=value string with the + following keys: name, email, password, role. + + Valid roles: system-admin, none. + """ + if not user_specs: + print("No users specified.", file=sys.stderr) + sys.exit(1) + + records = [] + with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: + for spec_str in user_specs: + spec = __parse_user_spec__(spec_str) + name = spec.get("name", "").strip() + email = spec.get("email", "").strip() + password = spec.get("password", "").strip() + role = spec.get("role", "none").strip() + + if not name: + print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if not email: + print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if not password: + print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if role not in _VALID_ROLES_: + print( + f"Invalid role {role!r} in spec: {spec_str!r}. " + f"Valid roles: {_VALID_ROLES_}", + file=sys.stderr) + sys.exit(1) + + user = save_user(cursor, email, name, verified=True) + set_user_password(cursor, user, password) + assign_default_roles(cursor, user) + if role == "system-admin": + grant_sysadmin_role(cursor, user) + + records.append({ + "user_id": str(user.user_id), + "name": user.name, + "email": user.email, + "password": password, + "role": role, + }) + + __write_output__({"users": records}, output_path) + ##### END: CLI Commands ##### if __name__ == '__main__': |
