aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-07-19 10:23:28 -0500
committerFrederick Muriuki Muriithi2024-07-31 09:30:24 -0500
commit6b18e1f0b05222d84fd0b06a8e5c2780df6958d5 (patch)
tree4806d860eaf68e2624b70b1f08dbba3eaf91c0f4 /gn_auth/auth/authorisation
parentc56dc39193909062d25e3270d9ab0ec3467a47ee (diff)
downloadgn-auth-6b18e1f0b05222d84fd0b06a8e5c2780df6958d5.tar.gz
Enable registration of a public-jwks-uri for every client
Diffstat (limited to 'gn_auth/auth/authorisation')
-rw-r--r--gn_auth/auth/authorisation/users/admin/views.py113
1 files changed, 7 insertions, 106 deletions
diff --git a/gn_auth/auth/authorisation/users/admin/views.py b/gn_auth/auth/authorisation/users/admin/views.py
index 0ab69e9..85aeb50 100644
--- a/gn_auth/auth/authorisation/users/admin/views.py
+++ b/gn_auth/auth/authorisation/users/admin/views.py
@@ -3,14 +3,12 @@ import uuid
import json
import random
import string
-from pathlib import Path
from typing import Optional
from functools import partial
from dataclasses import asdict
from urllib.parse import urlparse
from datetime import datetime, timezone, timedelta
-from authlib.jose import KeySet, JsonWebKey
from email_validator import validate_email, EmailNotValidError
from flask import (
flash,
@@ -178,6 +176,9 @@ def check_register_client_form(form):
"scope[]",
"You need to select at least one scope option."),)
+ if not uri_valid(form.get("client_jwk_uri", "")):
+ errors = errors + ("The provided client's public JWKs URI is invalid.",)
+
errors = tuple(item for item in errors if item is not None)
if bool(errors):
raise RegisterClientError(errors)
@@ -225,7 +226,8 @@ def register_client():
"default_redirect_uri": default_redirect_uri,
"redirect_uris": [default_redirect_uri] + form.get("other_redirect_uri", "").split(),
"response_type": __response_types__(tuple(grant_types)),
- "scope": form.getlist("scope[]")
+ "scope": form.getlist("scope[]"),
+ "public-jwks-uri": form.get("client_jwk_uri", "")
},
user = with_db_connection(partial(
user_by_id, user_id=uuid.UUID(form["user"])))
@@ -262,108 +264,6 @@ def view_client(client_id: uuid.UUID):
scope=app.config["OAUTH2_SCOPE"],
granttypes=_FORM_GRANT_TYPES_)
-@admin.route("/register-client-public-key", methods=["POST"])
-@is_admin
-def register_client_public_key():
- """Register a client's SSL key"""
- form = request.form
- admin_dashboard_uri = redirect(url_for("oauth2.admin.dashboard"))
- view_client_uri = redirect(url_for("oauth2.admin.view_client",
- client_id=form["client_id"]))
- if not bool(form.get("client_id")):
- flash("No client selected.", "alert-danger")
- return admin_dashboard_uri
-
- try:
- _client = with_db_connection(partial(
- oauth2_client, client_id=uuid.UUID(form["client_id"])))
- if _client.is_nothing():
- raise ValueError("No such client.")
- _client = _client.value
- except ValueError:
- flash("Invalid client ID provided.", "alert-danger")
- return admin_dashboard_uri
- try:
- _key = JsonWebKey.import_key(form["client_ssl_key"].strip())
- except ValueError:
- flash("Invalid key provided!", "alert-danger")
- return view_client_uri
-
- keypath = Path(app.config["CLIENTS_SSL_PUBLIC_KEYS_DIR"]).joinpath(
- f"{_key.thumbprint()}.pem")
- if not keypath.exists():
- with open(keypath, mode="w", encoding="utf8") as _kpth:
- _kpth.write(form["client_ssl_key"])
-
- with_db_connection(partial(save_client, the_client=OAuth2Client(
- client_id=_client.client_id,
- client_secret=_client.client_secret,
- client_id_issued_at=_client.client_id_issued_at,
- client_secret_expires_at=_client.client_secret_expires_at,
- client_metadata={
- **_client.client_metadata,
- "public_keys": list(set(
- _client.client_metadata.get("public_keys", []) +
- [str(keypath)]))},
- user=_client.user)))
- flash("Client key successfully registered.", "alert-success")
- return view_client_uri
-
-
-@admin.route("/delete-client-public-key", methods=["POST"])
-@is_admin
-def delete_client_public_key():
- """Delete a client's SSL key"""
- form = request.form
- admin_dashboard_uri = redirect(url_for("oauth2.admin.dashboard"))
- view_client_uri = redirect(url_for("oauth2.admin.view_client",
- client_id=form["client_id"]))
- if not bool(form.get("client_id")):
- flash("No client selected.", "alert-danger")
- return admin_dashboard_uri
-
- try:
- _client = with_db_connection(partial(
- oauth2_client, client_id=uuid.UUID(form["client_id"])))
- if _client.is_nothing():
- raise ValueError("No such client.")
- _client = _client.value
- except ValueError:
- flash("Invalid client ID provided.", "alert-danger")
- return admin_dashboard_uri
-
- if form.get("ssl_key", None) is None:
- flash("The key must be provided.", "alert-danger")
- return view_client_uri
-
- try:
- def find_by_kid(keyset: KeySet, kid: str) -> JsonWebKey:
- for key in keyset.keys:
- if key.thumbprint() == kid:
- return key
- raise ValueError('Invalid JSON Web Key Set')
- _key = find_by_kid(_client.jwks, form.get("ssl_key"))
- except ValueError:
- flash("Could not delete: No such public key.", "alert-danger")
- return view_client_uri
-
- _keys = (_key for _key in _client.jwks.keys
- if _key.thumbprint() != form["ssl_key"])
- _keysdir = Path(app.config["CLIENTS_SSL_PUBLIC_KEYS_DIR"])
- with_db_connection(partial(save_client, the_client=OAuth2Client(
- client_id=_client.client_id,
- client_secret=_client.client_secret,
- client_id_issued_at=_client.client_id_issued_at,
- client_secret_expires_at=_client.client_secret_expires_at,
- client_metadata={
- **_client.client_metadata,
- "public_keys": list(set(
- _keysdir.joinpath(f"{_key.thumbprint()}.pem")
- for _key in _keys))},
- user=_client.user)))
- flash("Key deleted.", "alert-success")
- return view_client_uri
-
@admin.route("/edit-client", methods=["POST"])
@is_admin
@@ -391,7 +291,8 @@ def edit_client():
[form["redirect_uri"]] +
form["other_redirect_uris"].split("\r\n"))),
"grant_types": form.getlist("grants[]"),
- "scope": form.getlist("scope[]")
+ "scope": form.getlist("scope[]"),
+ "public-jwks-uri": form.get("client_jwk_uri", "")
}
with_db_connection(partial(save_client, the_client=OAuth2Client(
the_client.client_id,