diff options
Diffstat (limited to 'gn_auth/wsgi.py')
| -rw-r--r-- | gn_auth/wsgi.py | 370 |
1 files changed, 365 insertions, 5 deletions
diff --git a/gn_auth/wsgi.py b/gn_auth/wsgi.py index e05ef0d..2db44fe 100644 --- a/gn_auth/wsgi.py +++ b/gn_auth/wsgi.py @@ -1,10 +1,13 @@ """Main entry point for project""" +import os +import re +import secrets import sys import uuid import json from math import ceil from pathlib import Path -from datetime import datetime +from datetime import datetime, timezone import click from yoyo import get_backend, read_migrations @@ -14,10 +17,16 @@ from gn_auth import create_app from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.errors import NotFoundError -from gn_auth.auth.authentication.users import user_by_id, hash_password -from gn_auth.auth.authorisation.users.admin.models import make_sys_admin - -from scripts import register_sys_admin as rsysadm# type: ignore[import] +from gn_auth.auth.authentication.users import ( + user_by_id, hash_password, save_user, set_user_password) +from gn_auth.auth.authorisation.roles.models import assign_default_roles +from gn_auth.auth.authorisation.users.admin.models import ( + make_sys_admin, grant_sysadmin_role) +from gn_auth.auth.authorisation.users.models import delete_users_by_id +from gn_auth.auth.authentication.oauth2.models.oauth2client import ( + OAuth2Client, save_client, delete_client, + client as oauth2_client_by_id) +from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import] app = create_app() @@ -127,6 +136,357 @@ def register_admin(): """Register the administrator.""" rsysadm.register_admin(Path(app.config["AUTH_DB"])) + +_VALID_ROLES_ = ("system-admin", "none") + +_TEST_EMAIL_DOMAIN_ = "regression-tests.genenetwork.org" + + +def __normalise_name_for_email__(name: str) -> str: + """Lowercase and strip non-alphanumeric characters for use in an email.""" + return re.sub(r"[^a-z0-9]", "", name.lower()) + + +def __create_one_user__(cursor, name: str, email: str, password: str, role: str) -> dict: + """Create a single user in the DB and return their credential record.""" + user = save_user(cursor, email, name, verified=True) + set_user_password(cursor, user, password) + assign_default_roles(cursor, user) + if role == "system-admin": + grant_sysadmin_role(cursor, user) + return { + "user_id": str(user.user_id), + "name": user.name, + "email": user.email, + "password": password, + "role": role, + } + + +def __parse_user_spec__(spec: str) -> dict: + """Parse 'key=value,key=value,...' into a dict.""" + result = {} + for part in spec.split(","): + key, _, value = part.partition("=") + if key.strip(): + result[key.strip()] = value.strip() + return result + + +def __write_output__(data: dict, output_path) -> None: + """Write JSON data to a file with 0600 permissions, or stdout.""" + text = json.dumps(data, indent=2) + if output_path is None: + print(text) + return + fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(fd, "w") as outfile: + outfile.write(text) + + +@app.cli.command() +@click.option("--user", "user_specs", multiple=True, + help='User spec: "name=...,email=...,password=...,role=..."') +@click.option("--output", "output_path", type=click.Path(), default=None, + help="Write credentials as JSON to this file (default: stdout)") +def create_users(user_specs, output_path): + """Create one or more users with specified credentials and roles. + + Each --user option takes a comma-separated key=value string with the + following keys: name, email, password, role. + + Valid roles: system-admin, none. + """ + if not user_specs: + print("No users specified.", file=sys.stderr) + sys.exit(1) + + records = [] + with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: + for spec_str in user_specs: + spec = __parse_user_spec__(spec_str) + name = spec.get("name", "").strip() + email = spec.get("email", "").strip() + password = spec.get("password", "").strip() + role = spec.get("role", "none").strip() + + if not name: + print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if not email: + print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if not password: + print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if role not in _VALID_ROLES_: + print( + f"Invalid role {role!r} in spec: {spec_str!r}. " + f"Valid roles: {_VALID_ROLES_}", + file=sys.stderr) + sys.exit(1) + + records.append( + __create_one_user__(cursor, name, email, password, role)) + + __write_output__({"users": records}, output_path) + + +@app.cli.command() +@click.option("--user-id", "user_ids", multiple=True, type=click.UUID, + help="UUID of a user to delete (repeatable)") +def delete_users(user_ids): + """Delete one or more users by ID, bypassing policy checks. + + Removes users unconditionally regardless of their roles or group + memberships. Use with care — intended for test teardown and administration. + """ + if not user_ids: + print("No user IDs specified.", file=sys.stderr) + sys.exit(1) + + with db.connection(app.config["AUTH_DB"]) as conn: + deleted = delete_users_by_id(conn, tuple(user_ids)) + print(f"Deleted {deleted} user(s).") + + +@app.cli.command() +@click.option("--session-timestamp", required=True, + help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)") +@click.option("--user", "user_specs", multiple=True, + help='User spec: "name=...,role=..."') +@click.option("--output", "output_path", required=True, type=click.Path(), + help="Write credentials as JSON to this file (0600 permissions)") +def create_test_users(session_timestamp, user_specs, output_path): + """Create ephemeral test users with auto-generated email and password. + + Each --user option takes a comma-separated key=value string with the + following keys: name, role. + + Email: <normalised-name><timestamp>@regression-tests.genenetwork.org + Password: randomly generated. + + Output is written with 0600 permissions. Valid roles: system-admin, none. + """ + if not user_specs: + print("No users specified.", file=sys.stderr) + sys.exit(1) + + records = [] + with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: + for spec_str in user_specs: + spec = __parse_user_spec__(spec_str) + name = spec.get("name", "").strip() + role = spec.get("role", "none").strip() + + if not name: + print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr) + sys.exit(1) + if role not in _VALID_ROLES_: + print( + f"Invalid role {role!r} in spec: {spec_str!r}. " + f"Valid roles: {_VALID_ROLES_}", + file=sys.stderr) + sys.exit(1) + + email = (f"{__normalise_name_for_email__(name)}" + f"{session_timestamp}@{_TEST_EMAIL_DOMAIN_}") + password = secrets.token_urlsafe(32) + + records.append( + __create_one_user__(cursor, name, email, password, role)) + + __write_output__( + {"session_timestamp": session_timestamp, "users": records}, + output_path) + + +_DEFAULT_GRANT_TYPES_ = ( + "password", + "authorization_code", + "refresh_token", + "urn:ietf:params:oauth:grant-type:jwt-bearer", +) + +_DEFAULT_SCOPES_ = ( + "profile", "group", "role", "resource", + "register-client", "user", "masquerade", + "migrate-data", "introspect", +) + + +def __create_one_client__( + conn, + client_name: str, + owner_user, + redirect_uris: tuple, + scopes: tuple = _DEFAULT_SCOPES_, + grant_types: tuple = _DEFAULT_GRANT_TYPES_, + jwks_uri: str = "", +) -> dict: + """Create a single OAuth2 client and return its credential record.""" + raw_secret = secrets.token_urlsafe(32) + the_client = OAuth2Client( + client_id=uuid.uuid4(), + client_secret=hash_password(raw_secret), + client_id_issued_at=datetime.now(tz=timezone.utc), + client_secret_expires_at=datetime.fromtimestamp(0), + client_metadata={ + "client_name": client_name, + "token_endpoint_auth_method": [ + "client_secret_post", "client_secret_basic"], + "client_type": "confidential", + "grant_types": list(grant_types), + "default_redirect_uri": redirect_uris[0] if redirect_uris else "", + "redirect_uris": list(redirect_uris), + "response_type": ["code", "token"], + "scope": list(scopes), + "public-jwks-uri": jwks_uri, + }, + user=owner_user) + save_client(conn, the_client) + return { + "client_id": str(the_client.client_id), + "client_secret": raw_secret, + "client_name": client_name, + } + + +@app.cli.command() +@click.option("--name", "client_name", required=True, + help="Human-readable name for the OAuth2 client") +@click.option("--owner-id", required=True, type=click.UUID, + help="UUID of the user who owns this client") +@click.option("--redirect-uri", "redirect_uris", multiple=True, + help="Allowed redirect URI (repeatable)") +@click.option("--scope", "scopes", multiple=True, + default=_DEFAULT_SCOPES_, show_default=False, + help="OAuth2 scope (repeatable; defaults to full scope set)") +@click.option("--grant-type", "grant_types", multiple=True, + default=_DEFAULT_GRANT_TYPES_, show_default=False, + help="Grant type (repeatable; defaults to all standard types)") +@click.option("--jwks-uri", default="", + help="URI to the client's public JWKS (optional)") +@click.option("--output", "output_path", type=click.Path(), default=None, + help="Write credentials as JSON to this file (default: stdout)") +def create_oauth2_client(client_name, owner_id, redirect_uris, scopes, + grant_types, jwks_uri, output_path): + """Create an OAuth2 client with specified parameters. + + Scopes and grant types default to the full standard set if not provided. + """ + with db.connection(app.config["AUTH_DB"]) as conn: + try: + owner = user_by_id(conn, owner_id) + except NotFoundError: + print(f"No user found with ID {owner_id}", file=sys.stderr) + sys.exit(1) + record = __create_one_client__( + conn, client_name, owner, redirect_uris, scopes, grant_types, + jwks_uri) + + __write_output__({"client": record}, output_path) + + +@app.cli.command() +@click.option("--session-timestamp", required=True, + help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)") +@click.option("--users-file", required=True, type=click.Path(exists=True), + help="Credentials file produced by create-test-users") +@click.option("--owner-role", default="system-admin", show_default=True, + help="Role of the user in users-file to assign as client owner") +@click.option("--output", "output_path", required=True, type=click.Path(), + help="Write credentials as JSON to this file (0600 permissions)") +def create_test_oauth2_client(session_timestamp, users_file, owner_role, + output_path): + """Create an ephemeral OAuth2 client for a test session. + + Reads the credentials file produced by create-test-users to find the + owner. Client name and secret are auto-generated using the session + timestamp. Output is written with 0600 permissions. + """ + with open(users_file) as f: + users_data = json.load(f) + + owner_record = next( + (u for u in users_data.get("users", []) if u["role"] == owner_role), + None) + if owner_record is None: + print( + f"No user with role {owner_role!r} found in {users_file}", + file=sys.stderr) + sys.exit(1) + + client_name = f"gn-test-client-{session_timestamp}" + + with db.connection(app.config["AUTH_DB"]) as conn: + try: + owner = user_by_id(conn, uuid.UUID(owner_record["user_id"])) + except NotFoundError: + print( + f"Owner user {owner_record['user_id']!r} not found in DB", + file=sys.stderr) + sys.exit(1) + record = __create_one_client__(conn, client_name, owner, tuple()) + + __write_output__( + {"session_timestamp": session_timestamp, "client": record}, + output_path) + + +@app.cli.command() +@click.option("--credentials", "credentials_path", required=True, + type=click.Path(exists=True), + help="Credentials file produced by create-oauth2-client or " + "create-test-oauth2-client") +def delete_oauth2_client(credentials_path): + """Delete an OAuth2 client using a credentials file. + + Reads the client_id from the given credentials file and removes the + client and all associated tokens from the database. + """ + with open(credentials_path) as f: + data = json.load(f) + + client_id_str = data.get("client", {}).get("client_id") + if not client_id_str: + print("No client_id found in credentials file.", file=sys.stderr) + sys.exit(1) + + client_id = uuid.UUID(client_id_str) + with db.connection(app.config["AUTH_DB"]) as conn: + the_client = oauth2_client_by_id(conn, client_id) + if the_client.is_nothing(): + print(f"No client found with ID {client_id}", file=sys.stderr) + sys.exit(1) + delete_client(conn, the_client.value) + print(f"Deleted OAuth2 client {client_id}.") + + +@app.cli.command() +@click.option("--credentials", "credentials_path", required=True, + type=click.Path(exists=True), + help="Credentials file produced by create-test-users") +def delete_test_users(credentials_path): + """Delete ephemeral test users using a credentials file. + + Reads the credentials file produced by create-test-users and deletes + all listed users unconditionally, bypassing policy checks. Intended + for CI test teardown. + """ + with open(credentials_path) as f: + data = json.load(f) + + user_ids = tuple( + uuid.UUID(u["user_id"]) for u in data.get("users", [])) + if not user_ids: + print("No users found in credentials file.", file=sys.stderr) + sys.exit(1) + + with db.connection(app.config["AUTH_DB"]) as conn: + deleted = delete_users_by_id(conn, user_ids) + print(f"Deleted {deleted} user(s).") + ##### END: CLI Commands ##### if __name__ == '__main__': |
