"""Main entry point for project""" import sys import uuid import json from math import ceil from pathlib import Path from datetime import datetime import click from yoyo import get_backend, read_migrations from gn_auth import migrations from gn_auth import create_app from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.errors import NotFoundError from gn_auth.auth.authentication.users import user_by_id, hash_password from gn_auth.auth.authorisation.users.admin.models import make_sys_admin from scripts import register_sys_admin as rsysadm# type: ignore[import] app = create_app() ##### BEGIN: CLI Commands ##### @app.cli.command() def apply_migrations(): """Apply the dabasase migrations.""" migrations.apply_migrations( get_backend(f'sqlite:///{app.config["AUTH_DB"]}'), read_migrations(app.config["AUTH_MIGRATIONS"])) def __init_dev_users__(): """Initialise dev users. Get's used in more than one place""" dev_users_query = """ INSERT INTO users (user_id, email, name, verified) VALUES (:user_id, :email, :name, 1) ON CONFLICT(email) DO UPDATE SET name=excluded.name, verified=excluded.verified """ dev_users_passwd = "INSERT OR REPLACE INTO user_credentials VALUES (:user_id, :hash)" dev_users = ({ "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928", "email": "test@development.user", "name": "Test Development User", "password": "testpasswd"},) with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: cursor.executemany(dev_users_query, dev_users) cursor.executemany(dev_users_passwd, ( {**usr, "hash": hash_password(usr["password"])} for usr in dev_users)) @app.cli.command() def init_dev_users(): """ Initialise development users for OAuth2 sessions. **NOTE**: You really should not run this in production/staging """ __init_dev_users__() @app.cli.command() @click.option('--client-uri', default= "http://localhost:5033", type=str) def init_dev_clients(client_uri): """ Initialise a development client for OAuth2 sessions. **NOTE**: You really should not run this in production/staging """ client_uri = client_uri.lstrip("/") __init_dev_users__() dev_clients_query = """ INSERT INTO oauth2_clients VALUES ( :client_id, :client_secret, :client_id_issued_at, :client_secret_expires_at, :client_metadata, :user_id ) ON CONFLICT(client_id) DO UPDATE SET client_secret=excluded.client_secret, client_secret_expires_at=excluded.client_secret_expires_at, client_metadata=excluded.client_metadata, user_id=excluded.user_id """ dev_clients = ({ "client_id": "0bbfca82-d73f-4bd4-a140-5ae7abb4a64d", "client_secret": "yadabadaboo", "client_id_issued_at": ceil(datetime.now().timestamp()), "client_secret_expires_at": 0, "client_metadata": json.dumps({ "client_name": "GN2 Dev Server", "token_endpoint_auth_method": [ "client_secret_post", "client_secret_basic"], "client_type": "confidential", "grant_types": ["password", "authorization_code", "refresh_token", "urn:ietf:params:oauth:grant-type:jwt-bearer"], "default_redirect_uri": f"{client_uri}/oauth2/code", "redirect_uris": [f"{client_uri}/oauth2/code", f"{client_uri}/oauth2/token"], "public-jwks-uri": f"{client_uri}/oauth2/public-jwks", "response_type": ["code", "token"], "scope": ["profile", "group", "role", "resource", "register-client", "user", "masquerade", "migrate-data", "introspect"] }), "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928"},) with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: cursor.executemany(dev_clients_query, dev_clients) @app.cli.command() @click.argument("user_id", type=click.UUID) def assign_system_admin(user_id: uuid.UUID): """Assign user with ID `user_id` administrator role.""" try: dburi = app.config["AUTH_DB"] with db.connection(dburi) as conn, db.cursor(conn) as cursor: make_sys_admin(cursor, user_by_id(conn, user_id)) return 0 except NotFoundError as nfe: print(nfe, file=sys.stderr) sys.exit(1) @app.cli.command() def register_admin(): """Register the administrator.""" rsysadm.register_admin(Path(app.config["AUTH_DB"])) ##### END: CLI Commands ##### if __name__ == '__main__': print("Starting app...") app.run()