From f50c9220b4baf2f02b078eb8be0de427243377d3 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 26 Jul 2024 16:39:39 -0500 Subject: Add module to handle the JSON Web Keys. --- uploader/default_settings.py | 3 ++ uploader/oauth2/jwks.py | 86 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 uploader/oauth2/jwks.py diff --git a/uploader/default_settings.py b/uploader/default_settings.py index f7ca48c..26fe665 100644 --- a/uploader/default_settings.py +++ b/uploader/default_settings.py @@ -18,3 +18,6 @@ GN2_SERVER_URL = "https://genenetwork.org/" SESSION_TYPE = "redis" SESSION_PERMANENT = True SESSION_USE_SIGNER = True + +JWKS_ROTATION_AGE_DAYS = 7 # Days (from creation) to keep a JWK in use. +JWKS_DELETION_AGE_DAYS = 14 # Days (from creation) to keep a JWK around before deleting it. diff --git a/uploader/oauth2/jwks.py b/uploader/oauth2/jwks.py new file mode 100644 index 0000000..efd0499 --- /dev/null +++ b/uploader/oauth2/jwks.py @@ -0,0 +1,86 @@ +"""Utilities dealing with JSON Web Keys (JWK)""" +import os +from pathlib import Path +from typing import Any, Union +from datetime import datetime, timedelta + +from flask import Flask +from authlib.jose import JsonWebKey +from pymonad.either import Left, Right, Either + +def jwks_directory(app: Flask, configname: str) -> Path: + """Compute the directory where the JWKs are stored.""" + appsecretsdir = Path(app.config[configname]).parent + if appsecretsdir.exists() and appsecretsdir.is_dir(): + jwksdir = Path(appsecretsdir, "jwks/") + if not jwksdir.exists(): + jwksdir.mkdir() + return jwksdir + raise ValueError( + "The `appsecretsdir` value should be a directory that actually exists.") + + +def generate_and_save_private_key( + storagedir: Path, + kty: str = "RSA", + crv_or_size: Union[str, int] = 2048, + options: tuple[tuple[str, Any]] = (("iat", datetime.now().timestamp()),) +) -> JsonWebKey: + """Generate a private key and save to `storagedir`.""" + privatejwk = JsonWebKey.generate_key( + kty, crv_or_size, dict(options), is_private=True) + keyname = f"{privatejwk.thumbprint()}.private.pem" + with open(Path(storagedir, keyname), "wb") as pemfile: + pemfile.write(privatejwk.as_pem(is_private=True)) + + return privatejwk + + +def pem_to_jwk(filepath: Path) -> JsonWebKey: + """Parse a PEM file into a JWK object.""" + with open(filepath, "rb") as pemfile: + return JsonWebKey.import_key(pemfile.read()) + + +def __sorted_jwks_paths__(storagedir: Path) -> tuple[tuple[float, Path], ...]: + """A sorted list of the JWK file paths with their creation timestamps.""" + return tuple(sorted(((os.stat(keypath).st_ctime, keypath) + for keypath in (Path(storagedir, keyfile) + for keyfile in os.listdir(storagedir) + if keyfile.endswith(".pem"))), + key=lambda tpl: tpl[0])) + + +def list_jwks(storagedir: Path) -> tuple[JsonWebKey, ...]: + """ + List all the JWKs in a particular directory in the order they were created. + """ + return tuple(pem_to_jwk(keypath) for ctime,keypath in + __sorted_jwks_paths__(storagedir)) + + +def newest_jwk(storagedir: Path) -> Either: + """ + Return an Either monad with the newest JWK or a message if none exists. + """ + existingkeys = __sorted_jwks_paths__(storagedir) + if len(existingkeys) > 0: + return Right(pem_to_jwk(existingkeys[-1][1])) + return Left("No JWKs exist") + + +def newest_jwk_with_rotation(jwksdir: Path, keyage: int) -> JsonWebKey: + """ + Retrieve the latests JWK, creating a new one if older than `keyage` days. + """ + def newer_than_days(jwkey): + filestat = os.stat(Path( + jwksdir, f"{jwkey.as_dict()['kid']}.private.pem")) + oldesttimeallowed = (datetime.now() - timedelta(days=keyage)) + if filestat.st_ctime < (oldesttimeallowed.timestamp()): + return Left("JWK is too old!") + return jwkey + + return newest_jwk(jwksdir).then(newer_than_days).either( + lambda _errmsg: generate_and_save_private_key(jwksdir), + lambda key: key) -- cgit v1.2.3