aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authentication/oauth2/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authentication/oauth2/server.py')
-rw-r--r--gn_auth/auth/authentication/oauth2/server.py43
1 files changed, 33 insertions, 10 deletions
diff --git a/gn_auth/auth/authentication/oauth2/server.py b/gn_auth/auth/authentication/oauth2/server.py
index d845c60..d1aa69e 100644
--- a/gn_auth/auth/authentication/oauth2/server.py
+++ b/gn_auth/auth/authentication/oauth2/server.py
@@ -1,15 +1,20 @@
"""Initialise the OAuth2 Server"""
+import os
import uuid
-import datetime
+from pathlib import Path
from typing import Callable
+from datetime import datetime, timedelta
+from pymonad.either import Left
from flask import Flask, current_app
-from authlib.jose import jwk, jwt
from authlib.oauth2.rfc7523 import JWTBearerTokenValidator
+from authlib.jose import jwk, jwt, JsonWebKey
from authlib.oauth2.rfc6749.errors import InvalidClientError
from authlib.integrations.flask_oauth2 import AuthorizationServer
from gn_auth.auth.db import sqlite3 as db
+from gn_auth.auth.jwks import (
+ newest_jwk, jwks_directory, generate_and_save_private_key)
from .models.oauth2client import client as fetch_client
from .models.oauth2token import OAuth2Token, save_token
@@ -86,10 +91,25 @@ def create_save_token_func(token_model: type, jwtkey: jwk) -> Callable:
return __save_token__
+def newest_jwk_with_rotation(jwksdir: Path, keyage: int) -> JsonWebKey:
+ """
+ Retrieve the latests JWK, creating a new one if older than `keyage` days.
+ """
+ def newer_than_days(jwkey):
+ filestat = os.stat(Path(
+ jwksdir, f"{jwkey.as_dict()['kid']}.private.pem"))
+ oldesttimeallowed = (datetime.now() - timedelta(days=keyage))
+ if filestat.st_ctime < (oldesttimeallowed.timestamp()):
+ return Left("JWK is too old!")
+ return jwkey
+
+ return newest_jwk(jwksdir).then(newer_than_days).either(
+ lambda _errmsg: generate_and_save_private_key(jwksdir),
+ lambda key: key)
+
def make_jwt_token_generator(app):
"""Make token generator function."""
- _gen = JWTBearerTokenGenerator(app.config["SSL_PRIVATE_KEY"])
def __generator__(# pylint: disable=[too-many-arguments]
grant_type,
client,
@@ -98,13 +118,16 @@ def make_jwt_token_generator(app):
expires_in=None,# pylint: disable=[unused-argument]
include_refresh_token=True
):
- return _gen.__call__(
- grant_type,
- client,
- user,
- scope,
- JWTBearerTokenGenerator.DEFAULT_EXPIRES_IN,
- include_refresh_token)
+ return JWTBearerTokenGenerator(
+ newest_jwk_with_rotation(
+ jwks_directory(app),
+ int(app.config["JWKS_ROTATION_AGE_DAYS"]))).__call__(
+ grant_type,
+ client,
+ user,
+ scope,
+ JWTBearerTokenGenerator.DEFAULT_EXPIRES_IN,
+ include_refresh_token)
return __generator__