diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users/admin/views.py')
-rw-r--r-- | gn_auth/auth/authorisation/users/admin/views.py | 160 |
1 files changed, 49 insertions, 111 deletions
diff --git a/gn_auth/auth/authorisation/users/admin/views.py b/gn_auth/auth/authorisation/users/admin/views.py index 8ca1e51..9bc1c36 100644 --- a/gn_auth/auth/authorisation/users/admin/views.py +++ b/gn_auth/auth/authorisation/users/admin/views.py @@ -3,14 +3,12 @@ import uuid import json import random import string -from pathlib import Path from typing import Optional from functools import partial from dataclasses import asdict from urllib.parse import urlparse from datetime import datetime, timezone, timedelta -from authlib.jose import KeySet, JsonWebKey from email_validator import validate_email, EmailNotValidError from flask import ( flash, @@ -32,6 +30,7 @@ from ....authentication.oauth2.models.oauth2client import ( save_client, OAuth2Client, oauth2_clients, + update_client_attribute, client as oauth2_client, delete_client as _delete_client) from ....authentication.users import ( @@ -62,7 +61,8 @@ _FORM_GRANT_TYPES_ = ({ @admin.before_request def update_expires(): """Update session expiration.""" - if session.session_info() and not session.update_expiry(): + if (session.session_info() and not session.update_expiry( + int(app.config.get("SESSION_EXPIRY_MINUTES", 10)))): flash("Session has expired. Logging out...", "alert-warning") session.clear_session_info() return redirect(url_for("oauth2.admin.login")) @@ -96,8 +96,9 @@ def login(): session.update_session_info( user=asdict(user), expires=( - datetime.now(tz=timezone.utc) + timedelta(minutes=10))) - return redirect(url_for(next_uri)) + datetime.now(tz=timezone.utc) + timedelta(minutes=int( + app.config.get("SESSION_EXPIRY_MINUTES", 10))))) + return redirect(url_for(next_uri, **dict(request.args))) raise NotFoundError(error_message) except NotFoundError as _nfe: flash(error_message, "alert-danger") @@ -176,6 +177,9 @@ def check_register_client_form(form): "scope[]", "You need to select at least one scope option."),) + if not uri_valid(form.get("client_jwk_uri", "")): + errors = errors + ("The provided client's public JWKs URI is invalid.",) + errors = tuple(item for item in errors if item is not None) if bool(errors): raise RegisterClientError(errors) @@ -193,7 +197,7 @@ def register_client(): if request.method == "GET": return render_template( "admin/register-client.html", - scope=app.config["OAUTH2_SCOPE"], + scope=app.config["OAUTH2_SCOPES_SUPPORTED"], users=with_db_connection(__list_users__), granttypes=_FORM_GRANT_TYPES_, current_user=session.session_user()) @@ -223,7 +227,8 @@ def register_client(): "default_redirect_uri": default_redirect_uri, "redirect_uris": [default_redirect_uri] + form.get("other_redirect_uri", "").split(), "response_type": __response_types__(tuple(grant_types)), - "scope": form.getlist("scope[]") + "scope": form.getlist("scope[]"), + "public-jwks-uri": form.get("client_jwk_uri", "") }, user = with_db_connection(partial( user_by_id, user_id=uuid.UUID(form["user"]))) @@ -257,111 +262,9 @@ def view_client(client_id: uuid.UUID): return render_template( "admin/view-oauth2-client.html", client=with_db_connection(partial(oauth2_client, client_id=client_id)), - scope=app.config["OAUTH2_SCOPE"], + scope=app.config["OAUTH2_SCOPES_SUPPORTED"], granttypes=_FORM_GRANT_TYPES_) -@admin.route("/register-client-public-key", methods=["POST"]) -@is_admin -def register_client_public_key(): - """Register a client's SSL key""" - form = request.form - admin_dashboard_uri = redirect(url_for("oauth2.admin.dashboard")) - view_client_uri = redirect(url_for("oauth2.admin.view_client", - client_id=form["client_id"])) - if not bool(form.get("client_id")): - flash("No client selected.", "alert-danger") - return admin_dashboard_uri - - try: - _client = with_db_connection(partial( - oauth2_client, client_id=uuid.UUID(form["client_id"]))) - if _client.is_nothing(): - raise ValueError("No such client.") - _client = _client.value - except ValueError: - flash("Invalid client ID provided.", "alert-danger") - return admin_dashboard_uri - try: - _key = JsonWebKey.import_key(form["client_ssl_key"].strip()) - except ValueError: - flash("Invalid key provided!", "alert-danger") - return view_client_uri - - keypath = Path(app.config["CLIENTS_SSL_PUBLIC_KEYS_DIR"]).joinpath( - f"{_key.thumbprint()}.pem") - if not keypath.exists(): - with open(keypath, mode="w", encoding="utf8") as _kpth: - _kpth.write(form["client_ssl_key"]) - - with_db_connection(partial(save_client, the_client=OAuth2Client( - client_id=_client.client_id, - client_secret=_client.client_secret, - client_id_issued_at=_client.client_id_issued_at, - client_secret_expires_at=_client.client_secret_expires_at, - client_metadata={ - **_client.client_metadata, - "public_keys": list(set( - _client.client_metadata.get("public_keys", []) + - [str(keypath)]))}, - user=_client.user))) - flash("Client key successfully registered.", "alert-success") - return view_client_uri - - -@admin.route("/delete-client-public-key", methods=["POST"]) -@is_admin -def delete_client_public_key(): - """Delete a client's SSL key""" - form = request.form - admin_dashboard_uri = redirect(url_for("oauth2.admin.dashboard")) - view_client_uri = redirect(url_for("oauth2.admin.view_client", - client_id=form["client_id"])) - if not bool(form.get("client_id")): - flash("No client selected.", "alert-danger") - return admin_dashboard_uri - - try: - _client = with_db_connection(partial( - oauth2_client, client_id=uuid.UUID(form["client_id"]))) - if _client.is_nothing(): - raise ValueError("No such client.") - _client = _client.value - except ValueError: - flash("Invalid client ID provided.", "alert-danger") - return admin_dashboard_uri - - if form.get("ssl_key", None) is None: - flash("The key must be provided.", "alert-danger") - return view_client_uri - - try: - def find_by_kid(keyset: KeySet, kid: str) -> JsonWebKey: - for key in keyset.keys: - if key.thumbprint() == kid: - return key - raise ValueError('Invalid JSON Web Key Set') - _key = find_by_kid(_client.jwks, form.get("ssl_key")) - except ValueError: - flash("Could not delete: No such public key.", "alert-danger") - return view_client_uri - - _keys = (_key for _key in _client.jwks.keys - if _key.thumbprint() != form["ssl_key"]) - _keysdir = Path(app.config["CLIENTS_SSL_PUBLIC_KEYS_DIR"]) - with_db_connection(partial(save_client, the_client=OAuth2Client( - client_id=_client.client_id, - client_secret=_client.client_secret, - client_id_issued_at=_client.client_id_issued_at, - client_secret_expires_at=_client.client_secret_expires_at, - client_metadata={ - **_client.client_metadata, - "public_keys": list(set( - _keysdir.joinpath(f"{_key.thumbprint()}.pem") - for _key in _keys))}, - user=_client.user))) - flash("Key deleted.", "alert-success") - return view_client_uri - @admin.route("/edit-client", methods=["POST"]) @is_admin @@ -389,7 +292,8 @@ def edit_client(): [form["redirect_uri"]] + form["other_redirect_uris"].split("\r\n"))), "grant_types": form.getlist("grants[]"), - "scope": form.getlist("scope[]") + "scope": form.getlist("scope[]"), + "public-jwks-uri": form.get("client_jwk_uri", "") } with_db_connection(partial(save_client, the_client=OAuth2Client( the_client.client_id, @@ -418,3 +322,37 @@ def delete_client(): "successfully."), "alert-success") return redirect(url_for("oauth2.admin.list_clients")) + + +@admin.route("/clients/<uuid:client_id>/change-secret", methods=["GET", "POST"]) +@is_admin +def change_client_secret(client_id: uuid.UUID): + """Enable changing of a client's secret.""" + def __no_client__(): + # Calling the function causes the flash to be evaluated + # flash("No such client was found!", "alert-danger") + return redirect(url_for("oauth2.admin.list_clients")) + + with db.connection(app.config["AUTH_DB"]) as conn: + if request.method == "GET": + return oauth2_client( + conn, client_id=client_id + ).maybe(__no_client__(), lambda _client: render_template( + "admin/confirm-change-client-secret.html", + client=_client + )) + + _raw = random_string() + return oauth2_client( + conn, client_id=client_id + ).then( + lambda _client: save_client( + conn, + update_client_attribute( + _client, "client_secret", hash_password(_raw))) + ).then( + lambda _client: render_template( + "admin/registered-client.html", + client=_client, + client_secret=_raw) + ).maybe(__no_client__(), lambda resp: resp) |