1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
"""Common oauth2 client utilities."""
import json
import requests
from typing import Any, Optional
from urllib.parse import urljoin
from flask import jsonify, current_app as app
from pymonad.maybe import Just, Maybe, Nothing
from pymonad.either import Left, Right, Either
from authlib.integrations.requests_client import OAuth2Session
from gn2.wqflask.oauth2 import session
from gn2.wqflask.oauth2.checks import user_logged_in
SCOPE = ("profile group role resource register-client user masquerade "
"introspect migrate-data")
def oauth2_client():
def __client__(token) -> OAuth2Session:
from gn2.utility.tools import (
AUTH_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
return OAuth2Session(
OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET,
scope=SCOPE, token_endpoint_auth_method="client_secret_post",
token=token)
return session.user_token().either(
lambda _notok: __client__(None),
lambda token: __client__(token))
def __no_token__(_err) -> Left:
"""Handle situation where request is attempted with no token."""
resp = requests.models.Response()
resp._content = json.dumps({
"error": "AuthenticationError",
"error-description": ("You need to authenticate to access requested "
"information.")}).encode("utf-8")
resp.status_code = 400
return Left(resp)
def oauth2_get(uri_path: str, data: dict = {}, **kwargs) -> Either:
def __get__(token) -> Either:
from gn2.utility.tools import (
AUTH_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
client = OAuth2Session(
OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET,
token=token, scope=SCOPE)
resp = client.get(
urljoin(AUTH_SERVER_URL, uri_path),
data=data,
**kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
return session.user_token().either(__no_token__, __get__)
def oauth2_post(
uri_path: str, data: Optional[dict] = None, json: Optional[dict] = None,
**kwargs) -> Either:
def __post__(token) -> Either:
from gn2.utility.tools import (
AUTH_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
client = OAuth2Session(
OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET,
token=token, scope=SCOPE)
resp = client.post(
urljoin(AUTH_SERVER_URL, uri_path), data=data, json=json,
**kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
return session.user_token().either(__no_token__, __post__)
def no_token_get(uri_path: str, **kwargs) -> Either:
from gn2.utility.tools import AUTH_SERVER_URL
resp = requests.get(urljoin(AUTH_SERVER_URL, uri_path), **kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
def no_token_post(uri_path: str, **kwargs) -> Either:
from gn2.utility.tools import (
AUTH_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
data = kwargs.get("data", {})
the_json = kwargs.get("json", {})
request_data = {
**data,
**the_json,
"client_id": OAUTH2_CLIENT_ID,
"client_secret": OAUTH2_CLIENT_SECRET
}
new_kwargs = {
**{
key: value for key, value in kwargs.items()
if key not in ("data", "json")
},
("data" if bool(data) else "json"): request_data
}
resp = requests.post(urljoin(AUTH_SERVER_URL, uri_path),
**new_kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
def post(uri_path: str, **kwargs) -> Either:
"""
Generic function to do POST requests, that checks whether or not the user is
logged in and selects the appropriate function/method to run.
"""
if user_logged_in():
return oauth2_post(uri_path, **kwargs)
return no_token_post(uri_path, **kwargs)
def get(uri_path: str, **kwargs) -> Either:
"""
Generic function to do GET requests, that checks whether or not the user is
logged in and selects the appropriate function/method to run.
"""
if user_logged_in():
return oauth2_get(uri_path, **kwargs)
return no_token_get(uri_path, **kwargs)
|