aboutsummaryrefslogtreecommitdiff
path: root/wqflask/wqflask/oauth2/client.py
blob: 664387a9e25ce8e4be52afe3a1a86e446cc82d5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Common oauth2 client utilities."""
import json
import requests
from typing import Any, Optional
from urllib.parse import urljoin

from flask import jsonify, current_app as app
from pymonad.maybe import Just, Maybe, Nothing
from pymonad.either import Left, Right, Either
from authlib.integrations.requests_client import OAuth2Session

from wqflask.oauth2 import session

SCOPE = "profile group role resource register-client user introspect migrate-data"

def oauth2_client():
    config = app.config
    def __client__(token) -> OAuth2Session:
        return OAuth2Session(
            config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
            scope=SCOPE, token_endpoint_auth_method="client_secret_post",
            token=token)
    return session.user_token().either(
        lambda _notok: __client__(None),
        lambda token: __client__(token))

def __no_token__(_err) -> Left:
    """Handle situation where request is attempted with no token."""
    resp = requests.models.Response()
    resp._content = json.dumps({
        "error": "AuthenticationError",
        "error-description": ("You need to authenticate to access requested "
                              "information.")}).encode("utf-8")
    resp.status_code = 400
    return Left(resp)

def oauth2_get(uri_path: str, data: dict = {}, **kwargs) -> Either:
    def __get__(token) -> Either:
        config = app.config
        client = OAuth2Session(
            config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
            token=token, scope=SCOPE)
        resp = client.get(
            urljoin(config["GN_SERVER_URL"], uri_path),
            data=data,
            **kwargs)
        if resp.status_code == 200:
            return Right(resp.json())

        return Left(resp)

    return session.user_token().either(__no_token__, __get__)

def oauth2_post(
        uri_path: str, data: Optional[dict] = None, json: Optional[dict] = None,
        **kwargs) -> Either:
    def __post__(token) -> Either:
        config = app.config
        client = OAuth2Session(
            config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
            token=token, scope=SCOPE)
        resp = client.post(
            urljoin(config["GN_SERVER_URL"], uri_path), data=data, json=json,
            **kwargs)
        if resp.status_code == 200:
            return Right(resp.json())

        return Left(resp)

    return session.user_token().either(__no_token__, __post__)

def no_token_get(uri_path: str, **kwargs) -> Either:
    config = app.config
    resp = requests.get(urljoin(config["GN_SERVER_URL"], uri_path), **kwargs)
    if resp.status_code == 200:
        return Right(resp.json())
    return Left(resp)

def no_token_post(uri_path: str, **kwargs) -> Either:
    config = app.config
    data = kwargs.get("data", {})
    the_json = kwargs.get("json", {})
    request_data = {
        **data,
        **the_json,
        "client_id": config["OAUTH2_CLIENT_ID"],
        "client_secret": config["OAUTH2_CLIENT_SECRET"]
    }
    new_kwargs = {
        **{
            key: value for key, value in kwargs.items()
            if key not in ("data", "json")
        },
        ("data" if bool(data) else "json"): request_data
    }
    resp = requests.post(urljoin(config["GN_SERVER_URL"], uri_path),
                         **new_kwargs)
    if resp.status_code == 200:
        return Right(resp.json())
    return Left(resp)