"""Main entry point for project""" import os import re import secrets import sys import uuid import json from math import ceil from pathlib import Path from datetime import datetime, timezone import click from yoyo import get_backend, read_migrations from gn_auth import migrations from gn_auth import create_app from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.errors import NotFoundError from gn_auth.auth.authentication.users import ( user_by_id, hash_password, save_user, set_user_password) from gn_auth.auth.authorisation.roles.models import assign_default_roles from gn_auth.auth.authorisation.users.admin.models import ( make_sys_admin, grant_sysadmin_role) from gn_auth.auth.authorisation.users.models import delete_users_by_id from gn_auth.auth.authentication.oauth2.models.oauth2client import ( OAuth2Client, save_client, delete_client, client as oauth2_client_by_id) from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import] app = create_app() ##### BEGIN: CLI Commands ##### @app.cli.command() def apply_migrations(): """Apply the dabasase migrations.""" migrations.apply_migrations( get_backend(f'sqlite:///{app.config["AUTH_DB"]}'), read_migrations(app.config["AUTH_MIGRATIONS"])) def __init_dev_users__(): """Initialise dev users. Get's used in more than one place""" dev_users_query = """ INSERT INTO users (user_id, email, name, verified) VALUES (:user_id, :email, :name, 1) ON CONFLICT(email) DO UPDATE SET name=excluded.name, verified=excluded.verified """ dev_users_passwd = "INSERT OR REPLACE INTO user_credentials VALUES (:user_id, :hash)" dev_users = ({ "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928", "email": "test@development.user", "name": "Test Development User", "password": "testpasswd"},) with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: cursor.executemany(dev_users_query, dev_users) cursor.executemany(dev_users_passwd, ( {**usr, "hash": hash_password(usr["password"])} for usr in dev_users)) @app.cli.command() def init_dev_users(): """ Initialise development users for OAuth2 sessions. **NOTE**: You really should not run this in production/staging """ __init_dev_users__() @app.cli.command() @click.option('--client-uri', default= "http://localhost:5033", type=str) def init_dev_clients(client_uri): """ Initialise a development client for OAuth2 sessions. **NOTE**: You really should not run this in production/staging """ client_uri = client_uri.lstrip("/") __init_dev_users__() dev_clients_query = """ INSERT INTO oauth2_clients VALUES ( :client_id, :client_secret, :client_id_issued_at, :client_secret_expires_at, :client_metadata, :user_id ) ON CONFLICT(client_id) DO UPDATE SET client_secret=excluded.client_secret, client_secret_expires_at=excluded.client_secret_expires_at, client_metadata=excluded.client_metadata, user_id=excluded.user_id """ dev_clients = ({ "client_id": "0bbfca82-d73f-4bd4-a140-5ae7abb4a64d", "client_secret": "yadabadaboo", "client_id_issued_at": ceil(datetime.now().timestamp()), "client_secret_expires_at": 0, "client_metadata": json.dumps({ "client_name": "GN2 Dev Server", "token_endpoint_auth_method": [ "client_secret_post", "client_secret_basic"], "client_type": "confidential", "grant_types": ["password", "authorization_code", "refresh_token", "urn:ietf:params:oauth:grant-type:jwt-bearer"], "default_redirect_uri": f"{client_uri}/oauth2/code", "redirect_uris": [f"{client_uri}/oauth2/code", f"{client_uri}/oauth2/token"], "public-jwks-uri": f"{client_uri}/oauth2/public-jwks", "response_type": ["code", "token"], "scope": ["profile", "group", "role", "resource", "register-client", "user", "masquerade", "migrate-data", "introspect"] }), "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928"},) with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: cursor.executemany(dev_clients_query, dev_clients) @app.cli.command() @click.argument("user_id", type=click.UUID) def assign_system_admin(user_id: uuid.UUID): """Assign user with ID `user_id` administrator role.""" try: dburi = app.config["AUTH_DB"] with db.connection(dburi) as conn, db.cursor(conn) as cursor: make_sys_admin(cursor, user_by_id(conn, user_id)) return 0 except NotFoundError as nfe: print(nfe, file=sys.stderr) sys.exit(1) @app.cli.command() def register_admin(): """Register the administrator.""" rsysadm.register_admin(Path(app.config["AUTH_DB"])) _VALID_ROLES_ = ("system-admin", "none") _TEST_EMAIL_DOMAIN_ = "regression-tests.genenetwork.org" def __normalise_name_for_email__(name: str) -> str: """Lowercase and strip non-alphanumeric characters for use in an email.""" return re.sub(r"[^a-z0-9]", "", name.lower()) def __create_one_user__(cursor, name: str, email: str, password: str, role: str) -> dict: """Create a single user in the DB and return their credential record.""" user = save_user(cursor, email, name, verified=True) set_user_password(cursor, user, password) assign_default_roles(cursor, user) if role == "system-admin": grant_sysadmin_role(cursor, user) return { "user_id": str(user.user_id), "name": user.name, "email": user.email, "password": password, "role": role, } def __parse_user_spec__(spec: str) -> dict: """Parse 'key=value,key=value,...' into a dict.""" result = {} for part in spec.split(","): key, _, value = part.partition("=") if key.strip(): result[key.strip()] = value.strip() return result def __write_output__(data: dict, output_path) -> None: """Write JSON data to a file with 0600 permissions, or stdout.""" text = json.dumps(data, indent=2) if output_path is None: print(text) return fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, "w") as outfile: outfile.write(text) @app.cli.command() @click.option("--user", "user_specs", multiple=True, help='User spec: "name=...,email=...,password=...,role=..."') @click.option("--output", "output_path", type=click.Path(), default=None, help="Write credentials as JSON to this file (default: stdout)") def create_users(user_specs, output_path): """Create one or more users with specified credentials and roles. Each --user option takes a comma-separated key=value string with the following keys: name, email, password, role. Valid roles: system-admin, none. """ if not user_specs: print("No users specified.", file=sys.stderr) sys.exit(1) records = [] with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: for spec_str in user_specs: spec = __parse_user_spec__(spec_str) name = spec.get("name", "").strip() email = spec.get("email", "").strip() password = spec.get("password", "").strip() role = spec.get("role", "none").strip() if not name: print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr) sys.exit(1) if not email: print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr) sys.exit(1) if not password: print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr) sys.exit(1) if role not in _VALID_ROLES_: print( f"Invalid role {role!r} in spec: {spec_str!r}. " f"Valid roles: {_VALID_ROLES_}", file=sys.stderr) sys.exit(1) records.append( __create_one_user__(cursor, name, email, password, role)) __write_output__({"users": records}, output_path) @app.cli.command() @click.option("--user-id", "user_ids", multiple=True, type=click.UUID, help="UUID of a user to delete (repeatable)") def delete_users(user_ids): """Delete one or more users by ID, bypassing policy checks. Removes users unconditionally regardless of their roles or group memberships. Use with care — intended for test teardown and administration. """ if not user_ids: print("No user IDs specified.", file=sys.stderr) sys.exit(1) with db.connection(app.config["AUTH_DB"]) as conn: deleted = delete_users_by_id(conn, tuple(user_ids)) print(f"Deleted {deleted} user(s).") @app.cli.command() @click.option("--session-timestamp", required=True, help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)") @click.option("--user", "user_specs", multiple=True, help='User spec: "name=...,role=..."') @click.option("--output", "output_path", required=True, type=click.Path(), help="Write credentials as JSON to this file (0600 permissions)") def create_test_users(session_timestamp, user_specs, output_path): """Create ephemeral test users with auto-generated email and password. Each --user option takes a comma-separated key=value string with the following keys: name, role. Email: @regression-tests.genenetwork.org Password: randomly generated. Output is written with 0600 permissions. Valid roles: system-admin, none. """ if not user_specs: print("No users specified.", file=sys.stderr) sys.exit(1) records = [] with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: for spec_str in user_specs: spec = __parse_user_spec__(spec_str) name = spec.get("name", "").strip() role = spec.get("role", "none").strip() if not name: print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr) sys.exit(1) if role not in _VALID_ROLES_: print( f"Invalid role {role!r} in spec: {spec_str!r}. " f"Valid roles: {_VALID_ROLES_}", file=sys.stderr) sys.exit(1) email = (f"{__normalise_name_for_email__(name)}" f"{session_timestamp}@{_TEST_EMAIL_DOMAIN_}") password = secrets.token_urlsafe(32) records.append( __create_one_user__(cursor, name, email, password, role)) __write_output__( {"session_timestamp": session_timestamp, "users": records}, output_path) _DEFAULT_GRANT_TYPES_ = ( "password", "authorization_code", "refresh_token", "urn:ietf:params:oauth:grant-type:jwt-bearer", ) _DEFAULT_SCOPES_ = ( "profile", "group", "role", "resource", "register-client", "user", "masquerade", "migrate-data", "introspect", ) def __create_one_client__(# pylint: disable=[too-many-arguments, too-many-positional-arguments] conn, client_name: str, owner_user, redirect_uris: tuple, scopes: tuple = _DEFAULT_SCOPES_, grant_types: tuple = _DEFAULT_GRANT_TYPES_, jwks_uri: str = "", ) -> dict: """Create a single OAuth2 client and return its credential record.""" raw_secret = secrets.token_urlsafe(32) the_client = OAuth2Client( client_id=uuid.uuid4(), client_secret=hash_password(raw_secret), client_id_issued_at=datetime.now(tz=timezone.utc), client_secret_expires_at=datetime.fromtimestamp(0), client_metadata={ "client_name": client_name, "token_endpoint_auth_method": [ "client_secret_post", "client_secret_basic"], "client_type": "confidential", "grant_types": list(grant_types), "default_redirect_uri": redirect_uris[0] if redirect_uris else "", "redirect_uris": list(redirect_uris), "response_type": ["code", "token"], "scope": list(scopes), "public-jwks-uri": jwks_uri, }, user=owner_user) save_client(conn, the_client) return { "client_id": str(the_client.client_id), "client_secret": raw_secret, "client_name": client_name, } @app.cli.command() @click.option("--name", "client_name", required=True, help="Human-readable name for the OAuth2 client") @click.option("--owner-id", required=True, type=click.UUID, help="UUID of the user who owns this client") @click.option("--redirect-uri", "redirect_uris", multiple=True, help="Allowed redirect URI (repeatable)") @click.option("--scope", "scopes", multiple=True, default=_DEFAULT_SCOPES_, show_default=False, help="OAuth2 scope (repeatable; defaults to full scope set)") @click.option("--grant-type", "grant_types", multiple=True, default=_DEFAULT_GRANT_TYPES_, show_default=False, help="Grant type (repeatable; defaults to all standard types)") @click.option("--jwks-uri", default="", help="URI to the client's public JWKS (optional)") @click.option("--output", "output_path", type=click.Path(), default=None, help="Write credentials as JSON to this file (default: stdout)") def create_oauth2_client(# pylint: disable=[too-many-arguments, too-many-positional-arguments] client_name, owner_id, redirect_uris, scopes, grant_types, jwks_uri, output_path ): """Create an OAuth2 client with specified parameters. Scopes and grant types default to the full standard set if not provided. """ with db.connection(app.config["AUTH_DB"]) as conn: try: owner = user_by_id(conn, owner_id) except NotFoundError: print(f"No user found with ID {owner_id}", file=sys.stderr) sys.exit(1) record = __create_one_client__( conn, client_name, owner, redirect_uris, scopes, grant_types, jwks_uri) __write_output__({"client": record}, output_path) @app.cli.command() @click.option("--session-timestamp", required=True, help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)") @click.option("--users-file", required=True, type=click.Path(exists=True), help="Credentials file produced by create-test-users") @click.option("--owner-role", default="system-admin", show_default=True, help="Role of the user in users-file to assign as client owner") @click.option("--output", "output_path", required=True, type=click.Path(), help="Write credentials as JSON to this file (0600 permissions)") def create_test_oauth2_client(session_timestamp, users_file, owner_role, output_path): """Create an ephemeral OAuth2 client for a test session. Reads the credentials file produced by create-test-users to find the owner. Client name and secret are auto-generated using the session timestamp. Output is written with 0600 permissions. """ with open(users_file, encoding="utf8") as f: users_data = json.load(f) owner_record = next( (u for u in users_data.get("users", []) if u["role"] == owner_role), None) if owner_record is None: print( f"No user with role {owner_role!r} found in {users_file}", file=sys.stderr) sys.exit(1) client_name = f"gn-test-client-{session_timestamp}" with db.connection(app.config["AUTH_DB"]) as conn: try: owner = user_by_id(conn, uuid.UUID(owner_record["user_id"])) except NotFoundError: print( f"Owner user {owner_record['user_id']!r} not found in DB", file=sys.stderr) sys.exit(1) record = __create_one_client__(conn, client_name, owner, tuple()) __write_output__( {"session_timestamp": session_timestamp, "client": record}, output_path) @app.cli.command() @click.option("--credentials", "credentials_path", required=True, type=click.Path(exists=True), help="Credentials file produced by create-oauth2-client or " "create-test-oauth2-client") def delete_oauth2_client(credentials_path): """Delete an OAuth2 client using a credentials file. Reads the client_id from the given credentials file and removes the client and all associated tokens from the database. """ with open(credentials_path, encoding="utf8") as f: data = json.load(f) client_id_str = data.get("client", {}).get("client_id") if not client_id_str: print("No client_id found in credentials file.", file=sys.stderr) sys.exit(1) client_id = uuid.UUID(client_id_str) with db.connection(app.config["AUTH_DB"]) as conn: the_client = oauth2_client_by_id(conn, client_id) if the_client.is_nothing(): print(f"No client found with ID {client_id}", file=sys.stderr) sys.exit(1) delete_client(conn, the_client.value) print(f"Deleted OAuth2 client {client_id}.") @app.cli.command() @click.option("--credentials", "credentials_path", required=True, type=click.Path(exists=True), help="Credentials file produced by create-test-users") def delete_test_users(credentials_path): """Delete ephemeral test users using a credentials file. Reads the credentials file produced by create-test-users and deletes all listed users unconditionally, bypassing policy checks. Intended for CI test teardown. """ with open(credentials_path, encoding="utf8") as f: data = json.load(f) user_ids = tuple( uuid.UUID(u["user_id"]) for u in data.get("users", [])) if not user_ids: print("No users found in credentials file.", file=sys.stderr) sys.exit(1) with db.connection(app.config["AUTH_DB"]) as conn: deleted = delete_users_by_id(conn, user_ids) print(f"Deleted {deleted} user(s).") ##### END: CLI Commands ##### if __name__ == '__main__': print("Starting app...") app.run()