about summary refs log tree commit diff
path: root/gn_auth/wsgi.py
blob: a5af37ea84f75612b046d2bf2849c31bd73db7c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
"""Main entry point for project"""
import os
import re
import secrets
import sys
import uuid
import json
from math import ceil
from pathlib import Path
from datetime import datetime, timezone

import click
from yoyo import get_backend, read_migrations

from gn_auth import migrations
from gn_auth import create_app

from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.errors import NotFoundError
from gn_auth.auth.authentication.users import (
    user_by_id, hash_password, save_user, set_user_password)
from gn_auth.auth.authorisation.roles.models import assign_default_roles
from gn_auth.auth.authorisation.users.admin.models import (
    make_sys_admin, grant_sysadmin_role)
from gn_auth.auth.authorisation.users.models import delete_users_by_id
from gn_auth.auth.authentication.oauth2.models.oauth2client import (
    OAuth2Client, save_client, delete_client,
    client as oauth2_client_by_id)
from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import]


app = create_app()

##### BEGIN: CLI Commands #####

@app.cli.command()
def apply_migrations():
    """Apply the dabasase migrations."""
    migrations.apply_migrations(
        get_backend(f'sqlite:///{app.config["AUTH_DB"]}'),
        read_migrations(app.config["AUTH_MIGRATIONS"]))

def __init_dev_users__():
    """Initialise dev users. Get's used in more than one place"""
    dev_users_query = """
    INSERT INTO users (user_id, email, name, verified)
        VALUES (:user_id, :email, :name, 1)
        ON CONFLICT(email) DO UPDATE SET
            name=excluded.name,
            verified=excluded.verified
    """
    dev_users_passwd = "INSERT OR REPLACE INTO user_credentials VALUES (:user_id, :hash)"
    dev_users = ({
        "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928",
        "email": "test@development.user",
        "name": "Test Development User",
        "password": "testpasswd"},)

    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
        cursor.executemany(dev_users_query, dev_users)
        cursor.executemany(dev_users_passwd, (
            {**usr, "hash": hash_password(usr["password"])}
            for usr in dev_users))

@app.cli.command()
def init_dev_users():
    """
    Initialise development users for OAuth2 sessions.

    **NOTE**: You really should not run this in production/staging
    """
    __init_dev_users__()

@app.cli.command()
@click.option('--client-uri', default= "http://localhost:5033", type=str)
def init_dev_clients(client_uri):
    """
    Initialise a development client for OAuth2 sessions.

    **NOTE**: You really should not run this in production/staging
    """
    client_uri = client_uri.lstrip("/")
    __init_dev_users__()
    dev_clients_query = """
        INSERT INTO oauth2_clients VALUES (
        :client_id, :client_secret, :client_id_issued_at,
        :client_secret_expires_at, :client_metadata, :user_id
        )
        ON CONFLICT(client_id) DO UPDATE SET
            client_secret=excluded.client_secret,
            client_secret_expires_at=excluded.client_secret_expires_at,
            client_metadata=excluded.client_metadata,
            user_id=excluded.user_id
        """
    dev_clients = ({
        "client_id": "0bbfca82-d73f-4bd4-a140-5ae7abb4a64d",
        "client_secret": "yadabadaboo",
        "client_id_issued_at": ceil(datetime.now().timestamp()),
        "client_secret_expires_at": 0,
        "client_metadata": json.dumps({
            "client_name": "GN2 Dev Server",
            "token_endpoint_auth_method": [
                "client_secret_post", "client_secret_basic"],
            "client_type": "confidential",
            "grant_types": ["password", "authorization_code", "refresh_token",
                            "urn:ietf:params:oauth:grant-type:jwt-bearer"],
            "default_redirect_uri": f"{client_uri}/oauth2/code",
            "redirect_uris": [f"{client_uri}/oauth2/code",
                              f"{client_uri}/oauth2/token"],
            "public-jwks-uri": f"{client_uri}/oauth2/public-jwks",
            "response_type": ["code", "token"],
            "scope": ["profile", "group", "role", "resource", "register-client",
                      "user", "masquerade", "migrate-data", "introspect"]
        }),
        "user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928"},)

    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
        cursor.executemany(dev_clients_query, dev_clients)


@app.cli.command()
@click.argument("user_id", type=click.UUID)
def assign_system_admin(user_id: uuid.UUID):
    """Assign user with ID `user_id` administrator role."""
    try:
        dburi = app.config["AUTH_DB"]
        with db.connection(dburi) as conn, db.cursor(conn) as cursor:
            make_sys_admin(cursor, user_by_id(conn, user_id))
            return 0
    except NotFoundError as nfe:
        print(nfe, file=sys.stderr)
        sys.exit(1)

@app.cli.command()
def register_admin():
    """Register the administrator."""
    rsysadm.register_admin(Path(app.config["AUTH_DB"]))


_VALID_ROLES_ = ("system-admin", "none")

_TEST_EMAIL_DOMAIN_ = "regression-tests.genenetwork.org"


def __normalise_name_for_email__(name: str) -> str:
    """Lowercase and strip non-alphanumeric characters for use in an email."""
    return re.sub(r"[^a-z0-9]", "", name.lower())


def __create_one_user__(cursor, name: str, email: str, password: str, role: str) -> dict:
    """Create a single user in the DB and return their credential record."""
    user = save_user(cursor, email, name, verified=True)
    set_user_password(cursor, user, password)
    assign_default_roles(cursor, user)
    if role == "system-admin":
        grant_sysadmin_role(cursor, user)
    return {
        "user_id": str(user.user_id),
        "name": user.name,
        "email": user.email,
        "password": password,
        "role": role,
    }


def __parse_user_spec__(spec: str) -> dict:
    """Parse 'key=value,key=value,...' into a dict."""
    result = {}
    for part in spec.split(","):
        key, _, value = part.partition("=")
        if key.strip():
            result[key.strip()] = value.strip()
    return result


def __write_output__(data: dict, output_path) -> None:
    """Write JSON data to a file with 0600 permissions, or stdout."""
    text = json.dumps(data, indent=2)
    if output_path is None:
        print(text)
        return
    fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
    with os.fdopen(fd, "w") as outfile:
        outfile.write(text)


@app.cli.command()
@click.option("--user", "user_specs", multiple=True,
              help='User spec: "name=...,email=...,password=...,role=..."')
@click.option("--output", "output_path", type=click.Path(), default=None,
              help="Write credentials as JSON to this file (default: stdout)")
def create_users(user_specs, output_path):
    """Create one or more users with specified credentials and roles.

    Each --user option takes a comma-separated key=value string with the
    following keys: name, email, password, role.

    Valid roles: system-admin, none.
    """
    if not user_specs:
        print("No users specified.", file=sys.stderr)
        sys.exit(1)

    records = []
    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
        for spec_str in user_specs:
            spec = __parse_user_spec__(spec_str)
            name = spec.get("name", "").strip()
            email = spec.get("email", "").strip()
            password = spec.get("password", "").strip()
            role = spec.get("role", "none").strip()

            if not name:
                print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr)
                sys.exit(1)
            if not email:
                print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr)
                sys.exit(1)
            if not password:
                print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr)
                sys.exit(1)
            if role not in _VALID_ROLES_:
                print(
                    f"Invalid role {role!r} in spec: {spec_str!r}. "
                    f"Valid roles: {_VALID_ROLES_}",
                    file=sys.stderr)
                sys.exit(1)

            records.append(
                __create_one_user__(cursor, name, email, password, role))

    __write_output__({"users": records}, output_path)


@app.cli.command()
@click.option("--user-id", "user_ids", multiple=True, type=click.UUID,
              help="UUID of a user to delete (repeatable)")
def delete_users(user_ids):
    """Delete one or more users by ID, bypassing policy checks.

    Removes users unconditionally regardless of their roles or group
    memberships. Use with care — intended for test teardown and administration.
    """
    if not user_ids:
        print("No user IDs specified.", file=sys.stderr)
        sys.exit(1)

    with db.connection(app.config["AUTH_DB"]) as conn:
        deleted = delete_users_by_id(conn, tuple(user_ids))
        print(f"Deleted {deleted} user(s).")


@app.cli.command()
@click.option("--session-timestamp", required=True,
              help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)")
@click.option("--user", "user_specs", multiple=True,
              help='User spec: "name=...,role=..."')
@click.option("--output", "output_path", required=True, type=click.Path(),
              help="Write credentials as JSON to this file (0600 permissions)")
def create_test_users(session_timestamp, user_specs, output_path):
    """Create ephemeral test users with auto-generated email and password.

    Each --user option takes a comma-separated key=value string with the
    following keys: name, role.

    Email: <normalised-name><timestamp>@regression-tests.genenetwork.org
    Password: randomly generated.

    Output is written with 0600 permissions. Valid roles: system-admin, none.
    """
    if not user_specs:
        print("No users specified.", file=sys.stderr)
        sys.exit(1)

    records = []
    with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
        for spec_str in user_specs:
            spec = __parse_user_spec__(spec_str)
            name = spec.get("name", "").strip()
            role = spec.get("role", "none").strip()

            if not name:
                print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr)
                sys.exit(1)
            if role not in _VALID_ROLES_:
                print(
                    f"Invalid role {role!r} in spec: {spec_str!r}. "
                    f"Valid roles: {_VALID_ROLES_}",
                    file=sys.stderr)
                sys.exit(1)

            email = (f"{__normalise_name_for_email__(name)}"
                     f"{session_timestamp}@{_TEST_EMAIL_DOMAIN_}")
            password = secrets.token_urlsafe(32)

            records.append(
                __create_one_user__(cursor, name, email, password, role))

    __write_output__(
        {"session_timestamp": session_timestamp, "users": records},
        output_path)


_DEFAULT_GRANT_TYPES_ = (
    "password",
    "authorization_code",
    "refresh_token",
    "urn:ietf:params:oauth:grant-type:jwt-bearer",
)

_DEFAULT_SCOPES_ = (
    "profile", "group", "role", "resource",
    "register-client", "user", "masquerade",
    "migrate-data", "introspect",
)


def __create_one_client__(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
        conn,
        client_name: str,
        owner_user,
        redirect_uris: tuple,
        scopes: tuple = _DEFAULT_SCOPES_,
        grant_types: tuple = _DEFAULT_GRANT_TYPES_,
        jwks_uri: str = "",
) -> dict:
    """Create a single OAuth2 client and return its credential record."""
    raw_secret = secrets.token_urlsafe(32)
    the_client = OAuth2Client(
        client_id=uuid.uuid4(),
        client_secret=hash_password(raw_secret),
        client_id_issued_at=datetime.now(tz=timezone.utc),
        client_secret_expires_at=datetime.fromtimestamp(0),
        client_metadata={
            "client_name": client_name,
            "token_endpoint_auth_method": [
                "client_secret_post", "client_secret_basic"],
            "client_type": "confidential",
            "grant_types": list(grant_types),
            "default_redirect_uri": redirect_uris[0] if redirect_uris else "",
            "redirect_uris": list(redirect_uris),
            "response_type": ["code", "token"],
            "scope": list(scopes),
            "public-jwks-uri": jwks_uri,
        },
        user=owner_user)
    save_client(conn, the_client)
    return {
        "client_id": str(the_client.client_id),
        "client_secret": raw_secret,
        "client_name": client_name,
    }


@app.cli.command()
@click.option("--name", "client_name", required=True,
              help="Human-readable name for the OAuth2 client")
@click.option("--owner-id", required=True, type=click.UUID,
              help="UUID of the user who owns this client")
@click.option("--redirect-uri", "redirect_uris", multiple=True,
              help="Allowed redirect URI (repeatable)")
@click.option("--scope", "scopes", multiple=True,
              default=_DEFAULT_SCOPES_, show_default=False,
              help="OAuth2 scope (repeatable; defaults to full scope set)")
@click.option("--grant-type", "grant_types", multiple=True,
              default=_DEFAULT_GRANT_TYPES_, show_default=False,
              help="Grant type (repeatable; defaults to all standard types)")
@click.option("--jwks-uri", default="",
              help="URI to the client's public JWKS (optional)")
@click.option("--output", "output_path", type=click.Path(), default=None,
              help="Write credentials as JSON to this file (default: stdout)")
def create_oauth2_client(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
        client_name,
        owner_id,
        redirect_uris,
        scopes,
        grant_types,
        jwks_uri,
        output_path
):
    """Create an OAuth2 client with specified parameters.

    Scopes and grant types default to the full standard set if not provided.
    """
    with db.connection(app.config["AUTH_DB"]) as conn:
        try:
            owner = user_by_id(conn, owner_id)
        except NotFoundError:
            print(f"No user found with ID {owner_id}", file=sys.stderr)
            sys.exit(1)
        record = __create_one_client__(
            conn, client_name, owner, redirect_uris, scopes, grant_types,
            jwks_uri)

    __write_output__({"client": record}, output_path)


@app.cli.command()
@click.option("--session-timestamp", required=True,
              help="Compact ISO 8601 UTC timestamp (e.g. 20260602T122700Z)")
@click.option("--users-file", required=True, type=click.Path(exists=True),
              help="Credentials file produced by create-test-users")
@click.option("--owner-role", default="system-admin", show_default=True,
              help="Role of the user in users-file to assign as client owner")
@click.option("--output", "output_path", required=True, type=click.Path(),
              help="Write credentials as JSON to this file (0600 permissions)")
def create_test_oauth2_client(session_timestamp, users_file, owner_role,
                              output_path):
    """Create an ephemeral OAuth2 client for a test session.

    Reads the credentials file produced by create-test-users to find the
    owner. Client name and secret are auto-generated using the session
    timestamp. Output is written with 0600 permissions.
    """
    with open(users_file, encoding="utf8") as f:
        users_data = json.load(f)

    owner_record = next(
        (u for u in users_data.get("users", []) if u["role"] == owner_role),
        None)
    if owner_record is None:
        print(
            f"No user with role {owner_role!r} found in {users_file}",
            file=sys.stderr)
        sys.exit(1)

    client_name = f"gn-test-client-{session_timestamp}"

    with db.connection(app.config["AUTH_DB"]) as conn:
        try:
            owner = user_by_id(conn, uuid.UUID(owner_record["user_id"]))
        except NotFoundError:
            print(
                f"Owner user {owner_record['user_id']!r} not found in DB",
                file=sys.stderr)
            sys.exit(1)
        record = __create_one_client__(conn, client_name, owner, tuple())

    __write_output__(
        {"session_timestamp": session_timestamp, "client": record},
        output_path)


@app.cli.command()
@click.option("--credentials", "credentials_path", required=True,
              type=click.Path(exists=True),
              help="Credentials file produced by create-oauth2-client or "
                   "create-test-oauth2-client")
def delete_oauth2_client(credentials_path):
    """Delete an OAuth2 client using a credentials file.

    Reads the client_id from the given credentials file and removes the
    client and all associated tokens from the database.
    """
    with open(credentials_path, encoding="utf8") as f:
        data = json.load(f)

    client_id_str = data.get("client", {}).get("client_id")
    if not client_id_str:
        print("No client_id found in credentials file.", file=sys.stderr)
        sys.exit(1)

    client_id = uuid.UUID(client_id_str)
    with db.connection(app.config["AUTH_DB"]) as conn:
        the_client = oauth2_client_by_id(conn, client_id)
        if the_client.is_nothing():
            print(f"No client found with ID {client_id}", file=sys.stderr)
            sys.exit(1)
        delete_client(conn, the_client.value)
        print(f"Deleted OAuth2 client {client_id}.")


@app.cli.command()
@click.option("--credentials", "credentials_path", required=True,
              type=click.Path(exists=True),
              help="Credentials file produced by create-test-users")
def delete_test_users(credentials_path):
    """Delete ephemeral test users using a credentials file.

    Reads the credentials file produced by create-test-users and deletes
    all listed users unconditionally, bypassing policy checks. Intended
    for CI test teardown.
    """
    with open(credentials_path, encoding="utf8") as f:
        data = json.load(f)

    user_ids = tuple(
        uuid.UUID(u["user_id"]) for u in data.get("users", []))
    if not user_ids:
        print("No users found in credentials file.", file=sys.stderr)
        sys.exit(1)

    with db.connection(app.config["AUTH_DB"]) as conn:
        deleted = delete_users_by_id(conn, user_ids)
        print(f"Deleted {deleted} user(s).")

##### END: CLI Commands #####

if __name__ == '__main__':
    print("Starting app...")
    app.run()