about summary refs log tree commit diff
path: root/uploader/oauth2/tokens.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/oauth2/tokens.py')
-rw-r--r--uploader/oauth2/tokens.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/uploader/oauth2/tokens.py b/uploader/oauth2/tokens.py
new file mode 100644
index 0000000..eb650f6
--- /dev/null
+++ b/uploader/oauth2/tokens.py
@@ -0,0 +1,47 @@
+"""Utilities for dealing with tokens."""
+import uuid
+from typing import Union
+from urllib.parse import urljoin
+from datetime import datetime, timedelta
+
+from authlib.jose import jwt
+from flask import current_app as app
+
+from uploader import monadic_requests as mrequests
+
+from . import jwks
+from .client import (SCOPE, authserver_uri, oauth2_clientid)
+
+
+def request_token(token_uri: str, user_id: Union[uuid.UUID, str], **kwargs):
+    """Request token from the auth server."""
+    issued = datetime.now()
+    jwtkey = jwks.newest_jwk_with_rotation(
+        jwks.jwks_directory(app, "UPLOADER_SECRETS"),
+        int(app.config["JWKS_ROTATION_AGE_DAYS"]))
+    _mins2expiry = kwargs.get("minutes_to_expiry", 5)
+    return mrequests.post(
+        token_uri,
+        json={
+            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
+            "scope": kwargs.get("scope", SCOPE),
+            "assertion": jwt.encode(
+                header={
+                    "alg": "RS256",
+                    "typ": "JWT",
+                    "kid": jwtkey.as_dict()["kid"]
+                },
+                payload={
+                    "iss": str(oauth2_clientid()),
+                    "sub": str(user_id),
+                    "aud": urljoin(authserver_uri(), "auth/token"),
+                    "exp": (issued + timedelta(minutes=_mins2expiry)).timestamp(),
+                    "nbf": int(issued.timestamp()),
+                    "iat": int(issued.timestamp()),
+                    "jti": str(uuid.uuid4())
+                },
+                key=jwtkey).decode("utf8"),
+            "client_id": oauth2_clientid(),
+            **kwargs.get("extra_params", {})
+        }
+    )