aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authentication/oauth2/models
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authentication/oauth2/models')
-rw-r--r--gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py20
-rw-r--r--gn_auth/auth/authentication/oauth2/models/oauth2client.py16
2 files changed, 30 insertions, 6 deletions
diff --git a/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py b/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py
index cca75f4..71769e1 100644
--- a/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py
+++ b/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py
@@ -1,5 +1,7 @@
"""Implement model for JWTBearerToken"""
import uuid
+import time
+from typing import Optional
from authlib.oauth2.rfc7523 import JWTBearerToken as _JWTBearerToken
@@ -28,3 +30,21 @@ class JWTBearerToken(_JWTBearerToken):
def check_client(self, client):
"""Check that the client is right."""
return self.client.get_client_id() == client.get_client_id()
+
+
+ def get_expires_in(self) -> Optional[int]:
+ """Return the number of seconds the token is valid for since issue.
+
+ If `None`, the token never expires."""
+ if "exp" in self:
+ return self['exp'] - self['iat']
+ return None
+
+
+ def is_expired(self):
+ """Check whether the token is expired.
+
+ If there is no 'exp' member, assume this token will never expire."""
+ if "exp" in self:
+ return self["exp"] < time.time()
+ return False
diff --git a/gn_auth/auth/authentication/oauth2/models/oauth2client.py b/gn_auth/auth/authentication/oauth2/models/oauth2client.py
index df5d564..c7e1c90 100644
--- a/gn_auth/auth/authentication/oauth2/models/oauth2client.py
+++ b/gn_auth/auth/authentication/oauth2/models/oauth2client.py
@@ -1,6 +1,5 @@
"""OAuth2 Client model."""
import json
-import logging
import datetime
from uuid import UUID
from functools import cached_property
@@ -8,11 +7,13 @@ from dataclasses import asdict, dataclass
from typing import Any, Sequence, Optional
import requests
+from flask import current_app as app
from requests.exceptions import JSONDecodeError
from authlib.jose import KeySet, JsonWebKey
from authlib.oauth2.rfc6749 import ClientMixin
from pymonad.maybe import Just, Maybe, Nothing
+from gn_auth.debug import __pk__
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.errors import NotFoundError
from gn_auth.auth.authentication.users import (User,
@@ -62,23 +63,26 @@ class OAuth2Client(ClientMixin):
def jwks(self) -> KeySet:
"""Return this client's KeySet."""
jwksuri = self.client_metadata.get("public-jwks-uri")
+ __pk__(f"PUBLIC JWKs link for client {self.client_id}", jwksuri)
if not bool(jwksuri):
- logging.debug("No Public JWKs URI set for client!")
+ app.logger.debug("No Public JWKs URI set for client!")
return KeySet([])
try:
## IMPORTANT: This can cause a deadlock if the client is working in
## single-threaded mode, i.e. can only serve one request
## at a time.
return KeySet([JsonWebKey.import_key(key)
- for key in requests.get(jwksuri).json()["jwks"]])
+ for key in requests.get(
+ jwksuri,
+ allow_redirects=True).json()["jwks"]])
except requests.ConnectionError as _connerr:
- logging.debug(
+ app.logger.debug(
"Could not connect to provided URI: %s", jwksuri, exc_info=True)
except JSONDecodeError as _jsonerr:
- logging.debug(
+ app.logger.debug(
"Could not convert response to JSON", exc_info=True)
except Exception as _exc:# pylint: disable=[broad-except]
- logging.debug(
+ app.logger.debug(
"Error retrieving the JWKs for the client.", exc_info=True)
return KeySet([])