"""Utilities for dealing with tokens.""" import uuid from typing import Union from urllib.parse import urljoin from datetime import datetime, timedelta from authlib.jose import jwt from flask import current_app as app from uploader import monadic_requests as mrequests from . import jwks from .client import (SCOPE, authserver_uri, oauth2_clientid) def request_token(token_uri: str, user_id: Union[uuid.UUID, str], **kwargs): """Request token from the auth server.""" issued = datetime.now() jwtkey = jwks.newest_jwk_with_rotation( jwks.jwks_directory(app, "UPLOADER_SECRETS"), int(app.config["JWKS_ROTATION_AGE_DAYS"])) _mins2expiry = kwargs.get("minutes_to_expiry", 5) return mrequests.post( token_uri, json={ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "scope": kwargs.get("scope", SCOPE), "assertion": jwt.encode( header={ "alg": "RS256", "typ": "JWT", "kid": jwtkey.as_dict()["kid"] }, payload={ "iss": str(oauth2_clientid()), "sub": str(user_id), "aud": urljoin(authserver_uri(), "auth/token"), "exp": (issued + timedelta(minutes=_mins2expiry)).timestamp(), "nbf": int(issued.timestamp()), "iat": int(issued.timestamp()), "jti": str(uuid.uuid4()) }, key=jwtkey).decode("utf8"), "client_id": oauth2_clientid(), **kwargs.get("extra_params", {}) } )