aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/wsgi.py
blob: e05ef0d1a52b2214de5a9d9107bdbb70cfeda3fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Main entry point for project"""
import sys
import uuid
import json
from math import ceil
from pathlib import Path
from datetime import datetime

import click
from yoyo import get_backend, read_migrations

from gn_auth import migrations
from gn_auth import create_app

from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.errors import NotFoundError
from gn_auth.auth.authentication.users import user_by_id, hash_password
from gn_auth.auth.authorisation.users.admin.models import make_sys_admin

from scripts import register_sys_admin as rsysadm# type: ignore[import]


app = create_app()

##### BEGIN: CLI Commands #####

@app.cli.command()
def apply_migrations():
    """Apply the dabasase migrations."""
    migrations.apply_migrations(
        get_backend(f'sqlite:///{app.config["AUTH_DB"]}'),
        read_migrations(app.config["AUTH_MIGRATIONS"]))

def __init_dev_users__():
    """Initialise dev users. Get's used in more than one place"""
    dev_users_query = """
    INSERT INTO users (user_id, email, name, verified)
        VALUES (:user_id, :email, :name, 1)
        ON CONFLICT(email) DO UPDATE SET
            name=excluded.name,
            verified=excluded.verified
    """
    dev_users_passwd = "INSERT OR REPLACE INTO user_credentials VALUES (:user_id, :hash)"
    dev_users = ({
        "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928",
        "email": "test@development.user",
        "name": "Test Development User",
        "password": "testpasswd"},)

    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
        cursor.executemany(dev_users_query, dev_users)
        cursor.executemany(dev_users_passwd, (
            {**usr, "hash": hash_password(usr["password"])}
            for usr in dev_users))

@app.cli.command()
def init_dev_users():
    """
    Initialise development users for OAuth2 sessions.

    **NOTE**: You really should not run this in production/staging
    """
    __init_dev_users__()

@app.cli.command()
@click.option('--client-uri', default= "http://localhost:5033", type=str)
def init_dev_clients(client_uri):
    """
    Initialise a development client for OAuth2 sessions.

    **NOTE**: You really should not run this in production/staging
    """
    client_uri = client_uri.lstrip("/")
    __init_dev_users__()
    dev_clients_query = """
        INSERT INTO oauth2_clients VALUES (
        :client_id, :client_secret, :client_id_issued_at,
        :client_secret_expires_at, :client_metadata, :user_id
        )
        ON CONFLICT(client_id) DO UPDATE SET
            client_secret=excluded.client_secret,
            client_secret_expires_at=excluded.client_secret_expires_at,
            client_metadata=excluded.client_metadata,
            user_id=excluded.user_id
        """
    dev_clients = ({
        "client_id": "0bbfca82-d73f-4bd4-a140-5ae7abb4a64d",
        "client_secret": "yadabadaboo",
        "client_id_issued_at": ceil(datetime.now().timestamp()),
        "client_secret_expires_at": 0,
        "client_metadata": json.dumps({
            "client_name": "GN2 Dev Server",
            "token_endpoint_auth_method": [
                "client_secret_post", "client_secret_basic"],
            "client_type": "confidential",
            "grant_types": ["password", "authorization_code", "refresh_token",
                            "urn:ietf:params:oauth:grant-type:jwt-bearer"],
            "default_redirect_uri": f"{client_uri}/oauth2/code",
            "redirect_uris": [f"{client_uri}/oauth2/code",
                              f"{client_uri}/oauth2/token"],
            "public-jwks-uri": f"{client_uri}/oauth2/public-jwks",
            "response_type": ["code", "token"],
            "scope": ["profile", "group", "role", "resource", "register-client",
                      "user", "masquerade", "migrate-data", "introspect"]
        }),
        "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928"},)

    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
        cursor.executemany(dev_clients_query, dev_clients)


@app.cli.command()
@click.argument("user_id", type=click.UUID)
def assign_system_admin(user_id: uuid.UUID):
    """Assign user with ID `user_id` administrator role."""
    try:
        dburi = app.config["AUTH_DB"]
        with db.connection(dburi) as conn, db.cursor(conn) as cursor:
            make_sys_admin(cursor, user_by_id(conn, user_id))
            return 0
    except NotFoundError as nfe:
        print(nfe, file=sys.stderr)
        sys.exit(1)

@app.cli.command()
def register_admin():
    """Register the administrator."""
    rsysadm.register_admin(Path(app.config["AUTH_DB"]))

##### END: CLI Commands #####

if __name__ == '__main__':
    print("Starting app...")
    app.run()