about summary refs log tree commit diff
"""Utilities for dealing with tokens."""
import uuid
from typing import Union
from urllib.parse import urljoin
from datetime import datetime, timedelta

from authlib.jose import jwt
from flask import current_app as app

from uploader import monadic_requests as mrequests

from . import jwks
from .client import (SCOPE, authserver_uri, oauth2_clientid)


def request_token(token_uri: str, user_id: Union[uuid.UUID, str], **kwargs):
    """Request token from the auth server."""
    issued = datetime.now()
    jwtkey = jwks.newest_jwk_with_rotation(
        jwks.jwks_directory(app, "UPLOADER_SECRETS"),
        int(app.config["JWKS_ROTATION_AGE_DAYS"]))
    _mins2expiry = kwargs.get("minutes_to_expiry", 5)
    return mrequests.post(
        token_uri,
        json={
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "scope": kwargs.get("scope", SCOPE),
            "assertion": jwt.encode(
                header={
                    "alg": "RS256",
                    "typ": "JWT",
                    "kid": jwtkey.as_dict()["kid"]
                },
                payload={
                    "iss": str(oauth2_clientid()),
                    "sub": str(user_id),
                    "aud": urljoin(authserver_uri(), "auth/token"),
                    "exp": (issued + timedelta(minutes=_mins2expiry)).timestamp(),
                    "nbf": int(issued.timestamp()),
                    "iat": int(issued.timestamp()),
                    "jti": str(uuid.uuid4())
                },
                key=jwtkey).decode("utf8"),
            "client_id": oauth2_clientid(),
            **kwargs.get("extra_params", {})
        }
    )