about summary refs log tree commit diff
path: root/gn_auth/wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/wsgi.py')
-rw-r--r--gn_auth/wsgi.py370
1 files changed, 365 insertions, 5 deletions
diff --git a/gn_auth/wsgi.py b/gn_auth/wsgi.py
index e05ef0d..2db44fe 100644
--- a/gn_auth/wsgi.py
+++ b/gn_auth/wsgi.py
@@ -1,10 +1,13 @@
 """Main entry point for project"""
+import os
+import re
+import secrets
 import sys
 import uuid
 import json
 from math import ceil
 from pathlib import Path
-from datetime import datetime
+from datetime import datetime, timezone
 
 import click
 from yoyo import get_backend, read_migrations
@@ -14,10 +17,16 @@ from gn_auth import create_app
 
 from gn_auth.auth.db import sqlite3 as db
 from gn_auth.auth.errors import NotFoundError
-from gn_auth.auth.authentication.users import user_by_id, hash_password
-from gn_auth.auth.authorisation.users.admin.models import make_sys_admin
-
-from scripts import register_sys_admin as rsysadm# type: ignore[import]
+from gn_auth.auth.authentication.users import (
+    user_by_id, hash_password, save_user, set_user_password)
+from gn_auth.auth.authorisation.roles.models import assign_default_roles
+from gn_auth.auth.authorisation.users.admin.models import (
+    make_sys_admin, grant_sysadmin_role)
+from gn_auth.auth.authorisation.users.models import delete_users_by_id
+from gn_auth.auth.authentication.oauth2.models.oauth2client import (
+    OAuth2Client, save_client, delete_client,
+    client as oauth2_client_by_id)
+from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import]
 
 
 app = create_app()
@@ -127,6 +136,357 @@ def register_admin():
     """Register the administrator."""
     rsysadm.register_admin(Path(app.config["AUTH_DB"]))
 
+
+_VALID_ROLES_ = ("system-admin", "none")
+
+_TEST_EMAIL_DOMAIN_ = "regression-tests.genenetwork.org"
+
+
+def __normalise_name_for_email__(name: str) -> str:
+    """Lowercase and strip non-alphanumeric characters for use in an email."""
+    return re.sub(r"[^a-z0-9]", "", name.lower())
+
+
+def __create_one_user__(cursor, name: str, email: str, password: str, role: str) -> dict:
+    """Create a single user in the DB and return their credential record."""
+    user = save_user(cursor, email, name, verified=True)
+    set_user_password(cursor, user, password)
+    assign_default_roles(cursor, user)
+    if role == "system-admin":
+        grant_sysadmin_role(cursor, user)
+    return {
+        "user_id": str(user.user_id),
+        "name": user.name,
+        "email": user.email,
+        "password": password,
+        "role": role,
+    }
+
+
+def __parse_user_spec__(spec: str) -> dict:
+    """Parse 'key=value,key=value,...' into a dict."""
+    result = {}
+    for part in spec.split(","):
+        key, _, value = part.partition("=")
+        if key.strip():
+            result[key.strip()] = value.strip()
+    return result
+
+
+def __write_output__(data: dict, output_path) -> None:
+    """Write JSON data to a file with 0600 permissions, or stdout."""
+    text = json.dumps(data, indent=2)
+    if output_path is None:
+        print(text)
+        return
+    fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
+    with os.fdopen(fd, "w") as outfile:
+        outfile.write(text)
+
+
+@app.cli.command()
+@click.option("--user", "user_specs", multiple=True,
+              help='User spec: "name=...,email=...,password=...,role=..."')
+@click.option("--output", "output_path", type=click.Path(), default=None,
+              help="Write credentials as JSON to this file (default: stdout)")
+def create_users(user_specs, output_path):
+    """Create one or more users with specified credentials and roles.
+
+    Each --user option takes a comma-separated key=value string with the
+    following keys: name, email, password, role.
+
+    Valid roles: system-admin, none.
+    """
+    if not user_specs:
+        print("No users specified.", file=sys.stderr)
+        sys.exit(1)
+
+    records = []
+    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
+        for spec_str in user_specs:
+            spec = __parse_user_spec__(spec_str)
+            name = spec.get("name", "").strip()
+            email = spec.get("email", "").strip()
+            password = spec.get("password", "").strip()
+            role = spec.get("role", "none").strip()
+
+            if not name:
+                print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if not email:
+                print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if not password:
+                print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if role not in _VALID_ROLES_:
+                print(
+                    f"Invalid role {role!r} in spec: {spec_str!r}. "
+                    f"Valid roles: {_VALID_ROLES_}",
+                    file=sys.stderr)
+                sys.exit(1)
+
+            records.append(
+                __create_one_user__(cursor, name, email, password, role))
+
+    __write_output__({"users": records}, output_path)
+
+
+@app.cli.command()
+@click.option("--user-id", "user_ids", multiple=True, type=click.UUID,
+              help="UUID of a user to delete (repeatable)")
+def delete_users(user_ids):
+    """Delete one or more users by ID, bypassing policy checks.
+
+    Removes users unconditionally regardless of their roles or group
+    memberships. Use with care — intended for test teardown and administration.
+    """
+    if not user_ids:
+        print("No user IDs specified.", file=sys.stderr)
+        sys.exit(1)
+
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        deleted = delete_users_by_id(conn, tuple(user_ids))
+        print(f"Deleted {deleted} user(s).")
+
+
+@app.cli.command()
+@click.option("--session-timestamp", required=True,
+              help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)")
+@click.option("--user", "user_specs", multiple=True,
+              help='User spec: "name=...,role=..."')
+@click.option("--output", "output_path", required=True, type=click.Path(),
+              help="Write credentials as JSON to this file (0600 permissions)")
+def create_test_users(session_timestamp, user_specs, output_path):
+    """Create ephemeral test users with auto-generated email and password.
+
+    Each --user option takes a comma-separated key=value string with the
+    following keys: name, role.
+
+    Email: <normalised-name><timestamp>@regression-tests.genenetwork.org
+    Password: randomly generated.
+
+    Output is written with 0600 permissions. Valid roles: system-admin, none.
+    """
+    if not user_specs:
+        print("No users specified.", file=sys.stderr)
+        sys.exit(1)
+
+    records = []
+    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
+        for spec_str in user_specs:
+            spec = __parse_user_spec__(spec_str)
+            name = spec.get("name", "").strip()
+            role = spec.get("role", "none").strip()
+
+            if not name:
+                print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr)
+                sys.exit(1)
+            if role not in _VALID_ROLES_:
+                print(
+                    f"Invalid role {role!r} in spec: {spec_str!r}. "
+                    f"Valid roles: {_VALID_ROLES_}",
+                    file=sys.stderr)
+                sys.exit(1)
+
+            email = (f"{__normalise_name_for_email__(name)}"
+                     f"{session_timestamp}@{_TEST_EMAIL_DOMAIN_}")
+            password = secrets.token_urlsafe(32)
+
+            records.append(
+                __create_one_user__(cursor, name, email, password, role))
+
+    __write_output__(
+        {"session_timestamp": session_timestamp, "users": records},
+        output_path)
+
+
+_DEFAULT_GRANT_TYPES_ = (
+    "password",
+    "authorization_code",
+    "refresh_token",
+    "urn:ietf:params:oauth:grant-type:jwt-bearer",
+)
+
+_DEFAULT_SCOPES_ = (
+    "profile", "group", "role", "resource",
+    "register-client", "user", "masquerade",
+    "migrate-data", "introspect",
+)
+
+
+def __create_one_client__(
+        conn,
+        client_name: str,
+        owner_user,
+        redirect_uris: tuple,
+        scopes: tuple = _DEFAULT_SCOPES_,
+        grant_types: tuple = _DEFAULT_GRANT_TYPES_,
+        jwks_uri: str = "",
+) -> dict:
+    """Create a single OAuth2 client and return its credential record."""
+    raw_secret = secrets.token_urlsafe(32)
+    the_client = OAuth2Client(
+        client_id=uuid.uuid4(),
+        client_secret=hash_password(raw_secret),
+        client_id_issued_at=datetime.now(tz=timezone.utc),
+        client_secret_expires_at=datetime.fromtimestamp(0),
+        client_metadata={
+            "client_name": client_name,
+            "token_endpoint_auth_method": [
+                "client_secret_post", "client_secret_basic"],
+            "client_type": "confidential",
+            "grant_types": list(grant_types),
+            "default_redirect_uri": redirect_uris[0] if redirect_uris else "",
+            "redirect_uris": list(redirect_uris),
+            "response_type": ["code", "token"],
+            "scope": list(scopes),
+            "public-jwks-uri": jwks_uri,
+        },
+        user=owner_user)
+    save_client(conn, the_client)
+    return {
+        "client_id": str(the_client.client_id),
+        "client_secret": raw_secret,
+        "client_name": client_name,
+    }
+
+
+@app.cli.command()
+@click.option("--name", "client_name", required=True,
+              help="Human-readable name for the OAuth2 client")
+@click.option("--owner-id", required=True, type=click.UUID,
+              help="UUID of the user who owns this client")
+@click.option("--redirect-uri", "redirect_uris", multiple=True,
+              help="Allowed redirect URI (repeatable)")
+@click.option("--scope", "scopes", multiple=True,
+              default=_DEFAULT_SCOPES_, show_default=False,
+              help="OAuth2 scope (repeatable; defaults to full scope set)")
+@click.option("--grant-type", "grant_types", multiple=True,
+              default=_DEFAULT_GRANT_TYPES_, show_default=False,
+              help="Grant type (repeatable; defaults to all standard types)")
+@click.option("--jwks-uri", default="",
+              help="URI to the client's public JWKS (optional)")
+@click.option("--output", "output_path", type=click.Path(), default=None,
+              help="Write credentials as JSON to this file (default: stdout)")
+def create_oauth2_client(client_name, owner_id, redirect_uris, scopes,
+                         grant_types, jwks_uri, output_path):
+    """Create an OAuth2 client with specified parameters.
+
+    Scopes and grant types default to the full standard set if not provided.
+    """
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        try:
+            owner = user_by_id(conn, owner_id)
+        except NotFoundError:
+            print(f"No user found with ID {owner_id}", file=sys.stderr)
+            sys.exit(1)
+        record = __create_one_client__(
+            conn, client_name, owner, redirect_uris, scopes, grant_types,
+            jwks_uri)
+
+    __write_output__({"client": record}, output_path)
+
+
+@app.cli.command()
+@click.option("--session-timestamp", required=True,
+              help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)")
+@click.option("--users-file", required=True, type=click.Path(exists=True),
+              help="Credentials file produced by create-test-users")
+@click.option("--owner-role", default="system-admin", show_default=True,
+              help="Role of the user in users-file to assign as client owner")
+@click.option("--output", "output_path", required=True, type=click.Path(),
+              help="Write credentials as JSON to this file (0600 permissions)")
+def create_test_oauth2_client(session_timestamp, users_file, owner_role,
+                              output_path):
+    """Create an ephemeral OAuth2 client for a test session.
+
+    Reads the credentials file produced by create-test-users to find the
+    owner. Client name and secret are auto-generated using the session
+    timestamp. Output is written with 0600 permissions.
+    """
+    with open(users_file) as f:
+        users_data = json.load(f)
+
+    owner_record = next(
+        (u for u in users_data.get("users", []) if u["role"] == owner_role),
+        None)
+    if owner_record is None:
+        print(
+            f"No user with role {owner_role!r} found in {users_file}",
+            file=sys.stderr)
+        sys.exit(1)
+
+    client_name = f"gn-test-client-{session_timestamp}"
+
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        try:
+            owner = user_by_id(conn, uuid.UUID(owner_record["user_id"]))
+        except NotFoundError:
+            print(
+                f"Owner user {owner_record['user_id']!r} not found in DB",
+                file=sys.stderr)
+            sys.exit(1)
+        record = __create_one_client__(conn, client_name, owner, tuple())
+
+    __write_output__(
+        {"session_timestamp": session_timestamp, "client": record},
+        output_path)
+
+
+@app.cli.command()
+@click.option("--credentials", "credentials_path", required=True,
+              type=click.Path(exists=True),
+              help="Credentials file produced by create-oauth2-client or "
+                   "create-test-oauth2-client")
+def delete_oauth2_client(credentials_path):
+    """Delete an OAuth2 client using a credentials file.
+
+    Reads the client_id from the given credentials file and removes the
+    client and all associated tokens from the database.
+    """
+    with open(credentials_path) as f:
+        data = json.load(f)
+
+    client_id_str = data.get("client", {}).get("client_id")
+    if not client_id_str:
+        print("No client_id found in credentials file.", file=sys.stderr)
+        sys.exit(1)
+
+    client_id = uuid.UUID(client_id_str)
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        the_client = oauth2_client_by_id(conn, client_id)
+        if the_client.is_nothing():
+            print(f"No client found with ID {client_id}", file=sys.stderr)
+            sys.exit(1)
+        delete_client(conn, the_client.value)
+        print(f"Deleted OAuth2 client {client_id}.")
+
+
+@app.cli.command()
+@click.option("--credentials", "credentials_path", required=True,
+              type=click.Path(exists=True),
+              help="Credentials file produced by create-test-users")
+def delete_test_users(credentials_path):
+    """Delete ephemeral test users using a credentials file.
+
+    Reads the credentials file produced by create-test-users and deletes
+    all listed users unconditionally, bypassing policy checks. Intended
+    for CI test teardown.
+    """
+    with open(credentials_path) as f:
+        data = json.load(f)
+
+    user_ids = tuple(
+        uuid.UUID(u["user_id"]) for u in data.get("users", []))
+    if not user_ids:
+        print("No users found in credentials file.", file=sys.stderr)
+        sys.exit(1)
+
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        deleted = delete_users_by_id(conn, user_ids)
+        print(f"Deleted {deleted} user(s).")
+
 ##### END: CLI Commands #####
 
 if __name__ == '__main__':