1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
"""Common oauth2 client utilities."""
import json
import requests
from typing import Any, Optional
from urllib.parse import urljoin
from flask import jsonify, current_app as app
from pymonad.maybe import Just, Maybe, Nothing
from pymonad.either import Left, Right, Either
from authlib.integrations.requests_client import OAuth2Session
from wqflask.oauth2 import session
SCOPE = "profile group role resource register-client user introspect migrate-data"
def oauth2_client():
config = app.config
def __client__(token) -> OAuth2Session:
return OAuth2Session(
config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
scope=SCOPE, token_endpoint_auth_method="client_secret_post",
token=token)
return session.user_token().either(
lambda _notok: __client__(None),
lambda token: __client__(token))
def __no_token__(_err) -> Left:
"""Handle situation where request is attempted with no token."""
resp = requests.models.Response()
resp._content = json.dumps({
"error": "AuthenticationError",
"error-description": ("You need to authenticate to access requested "
"information.")}).encode("utf-8")
resp.status_code = 400
return Left(resp)
def oauth2_get(uri_path: str, data: dict = {}, **kwargs) -> Either:
def __get__(token) -> Either:
config = app.config
client = OAuth2Session(
config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
token=token, scope=SCOPE)
resp = client.get(
urljoin(config["GN_SERVER_URL"], uri_path),
data=data,
**kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
return session.user_token().either(__no_token__, __get__)
def oauth2_post(
uri_path: str, data: Optional[dict] = None, json: Optional[dict] = None,
**kwargs) -> Either:
def __post__(token) -> Either:
config = app.config
client = OAuth2Session(
config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
token=token, scope=SCOPE)
resp = client.post(
urljoin(config["GN_SERVER_URL"], uri_path), data=data, json=json,
**kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
return session.user_token().either(__no_token__, __post__)
def no_token_get(uri_path: str, **kwargs) -> Either:
config = app.config
resp = requests.get(urljoin(config["GN_SERVER_URL"], uri_path), **kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
def no_token_post(uri_path: str, **kwargs) -> Either:
config = app.config
data = kwargs.get("data", {})
the_json = kwargs.get("json", {})
request_data = {
**data,
**the_json,
"client_id": config["OAUTH2_CLIENT_ID"],
"client_secret": config["OAUTH2_CLIENT_SECRET"]
}
new_kwargs = {
**{
key: value for key, value in kwargs.items()
if key not in ("data", "json")
},
("data" if bool(data) else "json"): request_data
}
resp = requests.post(urljoin(config["GN_SERVER_URL"], uri_path),
**new_kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
|