1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
"""Main entry point for project"""
import os
import sys
import uuid
import json
from math import ceil
from pathlib import Path
from datetime import datetime
import click
from yoyo import get_backend, read_migrations
from gn_auth import migrations
from gn_auth import create_app
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.errors import NotFoundError
from gn_auth.auth.authentication.users import (
user_by_id, hash_password, save_user, set_user_password)
from gn_auth.auth.authorisation.roles.models import assign_default_roles
from gn_auth.auth.authorisation.users.admin.models import (
make_sys_admin, grant_sysadmin_role)
from gn_auth.scripts import register_sys_admin as rsysadm# type: ignore[import]
app = create_app()
##### BEGIN: CLI Commands #####
@app.cli.command()
def apply_migrations():
"""Apply the dabasase migrations."""
migrations.apply_migrations(
get_backend(f'sqlite:///{app.config["AUTH_DB"]}'),
read_migrations(app.config["AUTH_MIGRATIONS"]))
def __init_dev_users__():
"""Initialise dev users. Get's used in more than one place"""
dev_users_query = """
INSERT INTO users (user_id, email, name, verified)
VALUES (:user_id, :email, :name, 1)
ON CONFLICT(email) DO UPDATE SET
name=excluded.name,
verified=excluded.verified
"""
dev_users_passwd = "INSERT OR REPLACE INTO user_credentials VALUES (:user_id, :hash)"
dev_users = ({
"user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928",
"email": "test@development.user",
"name": "Test Development User",
"password": "testpasswd"},)
with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
cursor.executemany(dev_users_query, dev_users)
cursor.executemany(dev_users_passwd, (
{**usr, "hash": hash_password(usr["password"])}
for usr in dev_users))
@app.cli.command()
def init_dev_users():
"""
Initialise development users for OAuth2 sessions.
**NOTE**: You really should not run this in production/staging
"""
__init_dev_users__()
@app.cli.command()
@click.option('--client-uri', default= "http://localhost:5033", type=str)
def init_dev_clients(client_uri):
"""
Initialise a development client for OAuth2 sessions.
**NOTE**: You really should not run this in production/staging
"""
client_uri = client_uri.lstrip("/")
__init_dev_users__()
dev_clients_query = """
INSERT INTO oauth2_clients VALUES (
:client_id, :client_secret, :client_id_issued_at,
:client_secret_expires_at, :client_metadata, :user_id
)
ON CONFLICT(client_id) DO UPDATE SET
client_secret=excluded.client_secret,
client_secret_expires_at=excluded.client_secret_expires_at,
client_metadata=excluded.client_metadata,
user_id=excluded.user_id
"""
dev_clients = ({
"client_id": "0bbfca82-d73f-4bd4-a140-5ae7abb4a64d",
"client_secret": "yadabadaboo",
"client_id_issued_at": ceil(datetime.now().timestamp()),
"client_secret_expires_at": 0,
"client_metadata": json.dumps({
"client_name": "GN2 Dev Server",
"token_endpoint_auth_method": [
"client_secret_post", "client_secret_basic"],
"client_type": "confidential",
"grant_types": ["password", "authorization_code", "refresh_token",
"urn:ietf:params:oauth:grant-type:jwt-bearer"],
"default_redirect_uri": f"{client_uri}/oauth2/code",
"redirect_uris": [f"{client_uri}/oauth2/code",
f"{client_uri}/oauth2/token"],
"public-jwks-uri": f"{client_uri}/oauth2/public-jwks",
"response_type": ["code", "token"],
"scope": ["profile", "group", "role", "resource", "register-client",
"user", "masquerade", "migrate-data", "introspect"]
}),
"user_id": "0ad1917c-57da-46dc-b79e-c81c91e5b928"},)
with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
cursor.executemany(dev_clients_query, dev_clients)
@app.cli.command()
@click.argument("user_id", type=click.UUID)
def assign_system_admin(user_id: uuid.UUID):
"""Assign user with ID `user_id` administrator role."""
try:
dburi = app.config["AUTH_DB"]
with db.connection(dburi) as conn, db.cursor(conn) as cursor:
make_sys_admin(cursor, user_by_id(conn, user_id))
return 0
except NotFoundError as nfe:
print(nfe, file=sys.stderr)
sys.exit(1)
@app.cli.command()
def register_admin():
"""Register the administrator."""
rsysadm.register_admin(Path(app.config["AUTH_DB"]))
_VALID_ROLES_ = ("system-admin", "none")
def __parse_user_spec__(spec: str) -> dict:
"""Parse 'key=value,key=value,...' into a dict."""
result = {}
for part in spec.split(","):
key, _, value = part.partition("=")
if key.strip():
result[key.strip()] = value.strip()
return result
def __write_output__(data: dict, output_path) -> None:
"""Write JSON data to a file with 0600 permissions, or stdout."""
text = json.dumps(data, indent=2)
if output_path is None:
print(text)
return
fd = os.open(output_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w") as outfile:
outfile.write(text)
@app.cli.command()
@click.option("--user", "user_specs", multiple=True,
help='User spec: "name=...,email=...,password=...,role=..."')
@click.option("--output", "output_path", type=click.Path(), default=None,
help="Write credentials as JSON to this file (default: stdout)")
def create_users(user_specs, output_path):
"""Create one or more users with specified credentials and roles.
Each --user option takes a comma-separated key=value string with the
following keys: name, email, password, role.
Valid roles: system-admin, none.
"""
if not user_specs:
print("No users specified.", file=sys.stderr)
sys.exit(1)
records = []
with db.connection(app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
for spec_str in user_specs:
spec = __parse_user_spec__(spec_str)
name = spec.get("name", "").strip()
email = spec.get("email", "").strip()
password = spec.get("password", "").strip()
role = spec.get("role", "none").strip()
if not name:
print(f"Missing 'name' in user spec: {spec_str!r}", file=sys.stderr)
sys.exit(1)
if not email:
print(f"Missing 'email' in user spec: {spec_str!r}", file=sys.stderr)
sys.exit(1)
if not password:
print(f"Missing 'password' in user spec: {spec_str!r}", file=sys.stderr)
sys.exit(1)
if role not in _VALID_ROLES_:
print(
f"Invalid role {role!r} in spec: {spec_str!r}. "
f"Valid roles: {_VALID_ROLES_}",
file=sys.stderr)
sys.exit(1)
user = save_user(cursor, email, name, verified=True)
set_user_password(cursor, user, password)
assign_default_roles(cursor, user)
if role == "system-admin":
grant_sysadmin_role(cursor, user)
records.append({
"user_id": str(user.user_id),
"name": user.name,
"email": user.email,
"password": password,
"role": role,
})
__write_output__({"users": records}, output_path)
##### END: CLI Commands #####
if __name__ == '__main__':
print("Starting app...")
app.run()
|