aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authentication/oauth2/models
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authentication/oauth2/models')
-rw-r--r--gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py15
-rw-r--r--gn_auth/auth/authentication/oauth2/models/oauth2client.py51
2 files changed, 57 insertions, 9 deletions
diff --git a/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py b/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py
index 2606ac6..cca75f4 100644
--- a/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py
+++ b/gn_auth/auth/authentication/oauth2/models/jwt_bearer_token.py
@@ -5,11 +5,26 @@ from authlib.oauth2.rfc7523 import JWTBearerToken as _JWTBearerToken
from gn_auth.auth.db.sqlite3 import with_db_connection
from gn_auth.auth.authentication.users import user_by_id
+from gn_auth.auth.authentication.oauth2.models.oauth2client import (
+ client as fetch_client)
class JWTBearerToken(_JWTBearerToken):
"""Overrides default JWTBearerToken class."""
def __init__(self, payload, header, options=None, params=None):
+ """Initialise the bearer token."""
+ # TOD0: Maybe remove this init and make this a dataclass like the way
+ # OAuth2Client is a dataclass
super().__init__(payload, header, options, params)
self.user = with_db_connection(
lambda conn:user_by_id(conn, uuid.UUID(payload["sub"])))
+ self.client = with_db_connection(
+ lambda conn: fetch_client(
+ conn, uuid.UUID(payload["oauth2_client_id"])
+ )
+ ).maybe(None, lambda _client: _client)
+
+
+ def check_client(self, client):
+ """Check that the client is right."""
+ return self.client.get_client_id() == client.get_client_id()
diff --git a/gn_auth/auth/authentication/oauth2/models/oauth2client.py b/gn_auth/auth/authentication/oauth2/models/oauth2client.py
index 8fac648..615d0ee 100644
--- a/gn_auth/auth/authentication/oauth2/models/oauth2client.py
+++ b/gn_auth/auth/authentication/oauth2/models/oauth2client.py
@@ -3,9 +3,9 @@ import json
import logging
import datetime
from uuid import UUID
-from dataclasses import dataclass
from functools import cached_property
-from typing import Sequence, Optional
+from dataclasses import asdict, dataclass
+from typing import Any, Sequence, Optional
import requests
from requests.exceptions import JSONDecodeError
@@ -13,6 +13,7 @@ from authlib.jose import KeySet, JsonWebKey
from authlib.oauth2.rfc6749 import ClientMixin
from pymonad.maybe import Just, Maybe, Nothing
+from gn_auth.debug import __pk__, getLogger
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.errors import NotFoundError
from gn_auth.auth.authentication.users import (User,
@@ -61,10 +62,19 @@ class OAuth2Client(ClientMixin):
def jwks(self) -> KeySet:
"""Return this client's KeySet."""
+ logger = getLogger(__name__)
jwksuri = self.client_metadata.get("public-jwks-uri")
- if not bool(jwksuri):
- logging.debug("No Public JWKs URI set for client!")
- return KeySet([])
+ ### ----- DEBUG: Remove this section ----- ###
+ import os
+ from pathlib import Path
+ ca_bundle = Path(os.environ.get("REQUESTS_CA_BUNDLE"))
+ __pk__(f"{ca_bundle} exists?", ca_bundle.exists())
+ ### ----- DEBUG: Remove this section ----- ###
+ if not bool(__pk__(
+ f"CLIENT'S ({self.client_id}) JWKs URI =======> ", jwksuri)):
+ logger.debug("No Public JWKs URI set for client!")
+ return __pk__("Return empty KeySet since URI is not set =====>",
+ KeySet([]))
try:
## IMPORTANT: This can cause a deadlock if the client is working in
## single-threaded mode, i.e. can only serve one request
@@ -72,15 +82,16 @@ class OAuth2Client(ClientMixin):
return KeySet([JsonWebKey.import_key(key)
for key in requests.get(jwksuri).json()["jwks"]])
except requests.ConnectionError as _connerr:
- logging.debug(
+ logger.debug(
"Could not connect to provided URI: %s", jwksuri, exc_info=True)
except JSONDecodeError as _jsonerr:
- logging.debug(
+ logger.debug(
"Could not convert response to JSON", exc_info=True)
except Exception as _exc:# pylint: disable=[broad-except]
- logging.debug(
+ logger.debug(
"Error retrieving the JWKs for the client.", exc_info=True)
- return KeySet([])
+ return __pk__("Return empty KeySet after failure =====>",
+ KeySet([]))
def check_endpoint_auth_method(self, method: str, endpoint: str) -> bool:
@@ -289,3 +300,25 @@ def delete_client(
cursor.execute("DELETE FROM oauth2_tokens WHERE client_id=?", params)
cursor.execute("DELETE FROM oauth2_clients WHERE client_id=?", params)
return the_client
+
+
+def update_client_attribute(
+ client: OAuth2Client,# pylint: disable=[redefined-outer-name]
+ attribute: str,
+ value: Any
+) -> OAuth2Client:
+ """Return a new OAuth2Client with the given attribute updated/changed."""
+ attrs = {
+ attr: type(value)
+ for attr, value in asdict(client).items()
+ if attr != "client_id"
+ }
+ assert (
+ attribute in attrs.keys() and isinstance(value, attrs[attribute])), (
+ "Invalid attribute/value provided!")
+ return OAuth2Client(
+ client_id=client.client_id,
+ **{
+ attr: (value if attr==attribute else getattr(client, attr))
+ for attr in attrs
+ })