aboutsummaryrefslogtreecommitdiff
path: root/uploader/oauth2/tokens.py
blob: eb650f6f46227c4dde70337680a24376cfe392f9 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""Utilities for dealing with tokens."""
import uuid
from typing import Union
from urllib.parse import urljoin
from datetime import datetime, timedelta

from authlib.jose import jwt
from flask import current_app as app

from uploader import monadic_requests as mrequests

from . import jwks
from .client import (SCOPE, authserver_uri, oauth2_clientid)


def request_token(token_uri: str, user_id: Union[uuid.UUID, str], **kwargs):
    """Request token from the auth server."""
    issued = datetime.now()
    jwtkey = jwks.newest_jwk_with_rotation(
        jwks.jwks_directory(app, "UPLOADER_SECRETS"),
        int(app.config["JWKS_ROTATION_AGE_DAYS"]))
    _mins2expiry = kwargs.get("minutes_to_expiry", 5)
    return mrequests.post(
        token_uri,
        json={
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "scope": kwargs.get("scope", SCOPE),
            "assertion": jwt.encode(
                header={
                    "alg": "RS256",
                    "typ": "JWT",
                    "kid": jwtkey.as_dict()["kid"]
                },
                payload={
                    "iss": str(oauth2_clientid()),
                    "sub": str(user_id),
                    "aud": urljoin(authserver_uri(), "auth/token"),
                    "exp": (issued + timedelta(minutes=_mins2expiry)).timestamp(),
                    "nbf": int(issued.timestamp()),
                    "iat": int(issued.timestamp()),
                    "jti": str(uuid.uuid4())
                },
                key=jwtkey).decode("utf8"),
            "client_id": oauth2_clientid(),
            **kwargs.get("extra_params", {})
        }
    )