aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-05-31 17:31:35 +0300
committerFrederick Muriuki Muriithi2023-05-31 17:31:35 +0300
commit1a6ed93a8f1def9c9f22df2aa99ea9dbffa2a9ff (patch)
tree7dfe8731e00f38de9c2e71ab180e2f15f37f47af
parent8d18898bd059e968425e9275090d8f48320191f7 (diff)
downloadgenenetwork2-1a6ed93a8f1def9c9f22df2aa99ea9dbffa2a9ff.tar.gz
Use utility.tools to get configuration variables
Using flask.current_app.config for configurations does not give the appropriate configurations.
-rw-r--r--wqflask/wqflask/oauth2/checks.py7
-rw-r--r--wqflask/wqflask/oauth2/client.py32
-rw-r--r--wqflask/wqflask/oauth2/data.py18
-rw-r--r--wqflask/wqflask/oauth2/request_utils.py12
-rw-r--r--wqflask/wqflask/oauth2/users.py14
5 files changed, 47 insertions, 36 deletions
diff --git a/wqflask/wqflask/oauth2/checks.py b/wqflask/wqflask/oauth2/checks.py
index 3b6d2471..38561c6f 100644
--- a/wqflask/wqflask/oauth2/checks.py
+++ b/wqflask/wqflask/oauth2/checks.py
@@ -27,11 +27,12 @@ def require_oauth2(func):
return redirect("/")
def __with_token__(token):
+ from utility.tools import (
+ GN_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
client = OAuth2Session(
- config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
- token=token)
+ OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET, token=token)
resp = client.get(
- urljoin(config["GN_SERVER_URL"], "oauth2/user"))
+ urljoin(GN_SERVER_URL, "oauth2/user"))
user_details = resp.json()
if not user_details.get("error", False):
return func(*args, **kwargs)
diff --git a/wqflask/wqflask/oauth2/client.py b/wqflask/wqflask/oauth2/client.py
index b992f0ad..2a06b156 100644
--- a/wqflask/wqflask/oauth2/client.py
+++ b/wqflask/wqflask/oauth2/client.py
@@ -16,10 +16,11 @@ SCOPE = ("profile group role resource register-client user masquerade "
"introspect migrate-data")
def oauth2_client():
- config = app.config
def __client__(token) -> OAuth2Session:
+ from utility.tools import (
+ GN_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
return OAuth2Session(
- config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
+ OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET,
scope=SCOPE, token_endpoint_auth_method="client_secret_post",
token=token)
return session.user_token().either(
@@ -38,12 +39,13 @@ def __no_token__(_err) -> Left:
def oauth2_get(uri_path: str, data: dict = {}, **kwargs) -> Either:
def __get__(token) -> Either:
- config = app.config
+ from utility.tools import (
+ GN_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
client = OAuth2Session(
- config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
+ OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET,
token=token, scope=SCOPE)
resp = client.get(
- urljoin(config["GN_SERVER_URL"], uri_path),
+ urljoin(GN_SERVER_URL, uri_path),
data=data,
**kwargs)
if resp.status_code == 200:
@@ -57,12 +59,13 @@ def oauth2_post(
uri_path: str, data: Optional[dict] = None, json: Optional[dict] = None,
**kwargs) -> Either:
def __post__(token) -> Either:
- config = app.config
+ from utility.tools import (
+ GN_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
client = OAuth2Session(
- config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
+ OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET,
token=token, scope=SCOPE)
resp = client.post(
- urljoin(config["GN_SERVER_URL"], uri_path), data=data, json=json,
+ urljoin(GN_SERVER_URL, uri_path), data=data, json=json,
**kwargs)
if resp.status_code == 200:
return Right(resp.json())
@@ -72,21 +75,22 @@ def oauth2_post(
return session.user_token().either(__no_token__, __post__)
def no_token_get(uri_path: str, **kwargs) -> Either:
- config = app.config
- resp = requests.get(urljoin(config["GN_SERVER_URL"], uri_path), **kwargs)
+ from utility.tools import GN_SERVER_URL
+ resp = requests.get(urljoin(GN_SERVER_URL, uri_path), **kwargs)
if resp.status_code == 200:
return Right(resp.json())
return Left(resp)
def no_token_post(uri_path: str, **kwargs) -> Either:
- config = app.config
+ from utility.tools import (
+ GN_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
data = kwargs.get("data", {})
the_json = kwargs.get("json", {})
request_data = {
**data,
**the_json,
- "client_id": config["OAUTH2_CLIENT_ID"],
- "client_secret": config["OAUTH2_CLIENT_SECRET"]
+ "client_id": OAUTH2_CLIENT_ID,
+ "client_secret": OAUTH2_CLIENT_SECRET
}
new_kwargs = {
**{
@@ -95,7 +99,7 @@ def no_token_post(uri_path: str, **kwargs) -> Either:
},
("data" if bool(data) else "json"): request_data
}
- resp = requests.post(urljoin(config["GN_SERVER_URL"], uri_path),
+ resp = requests.post(urljoin(GN_SERVER_URL, uri_path),
**new_kwargs)
if resp.status_code == 200:
return Right(resp.json())
diff --git a/wqflask/wqflask/oauth2/data.py b/wqflask/wqflask/oauth2/data.py
index 8b8f4f20..795e9ea3 100644
--- a/wqflask/wqflask/oauth2/data.py
+++ b/wqflask/wqflask/oauth2/data.py
@@ -18,8 +18,9 @@ from .client import oauth2_get, oauth2_post
data = Blueprint("data", __name__)
def __search_mrna__(query, template, **kwargs):
+ from utility.tools import GN_SERVER_URL
species_name = kwargs["species_name"]
- search_uri = urljoin(app.config["GN_SERVER_URL"], "oauth2/data/search")
+ search_uri = urljoin(GN_SERVER_URL, "oauth2/data/search")
datasets = oauth2_get(
"oauth2/data/search",
json = {
@@ -42,8 +43,9 @@ def __selected_datasets__():
request.form.get("selected", []))
def __search_genotypes__(query, template, **kwargs):
+ from utility.tools import GN_SERVER_URL
species_name = kwargs["species_name"]
- search_uri = urljoin(app.config["GN_SERVER_URL"], "oauth2/data/search")
+ search_uri = urljoin(GN_SERVER_URL, "oauth2/data/search")
datasets = oauth2_get(
"oauth2/data/search",
json = {
@@ -57,6 +59,7 @@ def __search_genotypes__(query, template, **kwargs):
return render_ui(template, search_uri=search_uri, **datasets, **kwargs)
def __search_phenotypes__(query, template, **kwargs):
+ from utility.tools import GN_SERVER_URL
page = int(request.args.get("page", 1))
per_page = int(request.args.get("per_page", 50))
selected_traits = request.form.getlist("selected_traits")
@@ -68,10 +71,10 @@ def __search_phenotypes__(query, template, **kwargs):
template, traits=[], per_page=per_page, query=query,
selected_traits=selected_traits, search_results=search_results,
search_endpoint=urljoin(
- app.config["GN_SERVER_URL"], "oauth2/data/search"),
- gn_server_url = app.config["GN_SERVER_URL"],
+ GN_SERVER_URL, "oauth2/data/search"),
+ gn_server_url = GN_SERVER_URL,
results_endpoint=urljoin(
- app.config["GN_SERVER_URL"],
+ GN_SERVER_URL,
f"oauth2/data/search/phenotype/{job_id}"),
**kwargs)
return oauth2_get("oauth2/data/search", json={
@@ -79,7 +82,7 @@ def __search_phenotypes__(query, template, **kwargs):
"species_name": kwargs["species_name"],
"per_page": per_page,
"page": page,
- "gn3_server_uri": app.config["GN_SERVER_URL"]
+ "gn3_server_uri": GN_SERVER_URL
}).either(
lambda err: __search_error__(process_error(err)),
__search_success__)
@@ -121,6 +124,7 @@ def json_search_mrna() -> Response:
@data.route("/phenotype/search", methods=["POST"])
def json_search_phenotypes() -> Response:
"""Search for phenotypes."""
+ from utility.tools import GN_SERVER_URL
form = request.json
def __handle_error__(err):
error = process_error(err)
@@ -134,7 +138,7 @@ def json_search_phenotypes() -> Response:
"query": form.get("query", ""),
"per_page": int(form.get("per_page", 50)),
"page": int(form.get("page", 1)),
- "gn3_server_uri": app.config["GN_SERVER_URL"],
+ "gn3_server_uri": GN_SERVER_URL,
"selected_traits": form.get("selected_traits", [])
}).either(__handle_error__, jsonify)
diff --git a/wqflask/wqflask/oauth2/request_utils.py b/wqflask/wqflask/oauth2/request_utils.py
index d994cd36..b9f2aa7d 100644
--- a/wqflask/wqflask/oauth2/request_utils.py
+++ b/wqflask/wqflask/oauth2/request_utils.py
@@ -10,12 +10,13 @@ from flask import (
from .client import SCOPE, oauth2_get
def authserver_authorise_uri():
+ from utility.tools import GN_SERVER_URL, OAUTH2_CLIENT_ID
req_baseurl = urlparse(request.base_url)
host_uri = f"{req_baseurl.scheme}://{req_baseurl.netloc}/"
return urljoin(
- app.config["GN_SERVER_URL"],
+ GN_SERVER_URL,
"oauth2/authorise?response_type=code"
- f"&client_id={app.config['OAUTH2_CLIENT_ID']}"
+ f"&client_id={OAUTH2_CLIENT_ID}"
f"&redirect_uri={urljoin(host_uri, 'oauth2/code')}")
def raise_unimplemented():
@@ -30,13 +31,14 @@ def process_error(error: Response,
message: str=("Requested endpoint was not found on the API "
"server.")
) -> dict:
- if error.status_code == 404:
+ if error.status_code in (401, 404):
try:
- msg = error.json()["error_description"]
+ err = error.json()
+ msg = err.get("error_description", f"{error.reason}")
except simplejson.errors.JSONDecodeError as _jde:
msg = message
return {
- "error": "NotFoundError",
+ "error": error.reason,
"error_message": msg,
"error_description": msg,
"status_code": error.status_code
diff --git a/wqflask/wqflask/oauth2/users.py b/wqflask/wqflask/oauth2/users.py
index 51a4ca4c..1ff23d17 100644
--- a/wqflask/wqflask/oauth2/users.py
+++ b/wqflask/wqflask/oauth2/users.py
@@ -65,19 +65,19 @@ def request_add_to_group() -> Response:
@users.route("/login", methods=["GET", "POST"])
def login():
"""Route to allow users to sign up."""
+ from utility.tools import GN_SERVER_URL
next_endpoint=request.args.get("next", False)
if request.method == "POST":
form = request.form
client = oauth2_client()
- config = app.config
try:
token = client.fetch_token(
- urljoin(config["GN_SERVER_URL"], "oauth2/token"),
+ urljoin(GN_SERVER_URL, "oauth2/token"),
username=form.get("email_address"),
password=form.get("password"),
grant_type="password")
- session.set_token(token)
+ session.set_user_token(token)
udets = user_details()
session.set_user_details({
"user_id": UUID(udets["user_id"]),
@@ -101,10 +101,10 @@ def login():
@users.route("/logout", methods=["GET", "POST"])
def logout():
+ from utility.tools import GN_SERVER_URL
if user_logged_in():
- config = app.config
resp = oauth2_client().revoke_token(
- urljoin(config["GN_SERVER_URL"], "oauth2/revoke"))
+ urljoin(GN_SERVER_URL, "oauth2/revoke"))
the_session = session.session_info()
if not bool(the_session["masquerading"]):
# Normal session - clear and go back.
@@ -124,6 +124,7 @@ def logout():
@users.route("/register", methods=["GET", "POST"])
def register_user():
+ from utility.tools import GN_SERVER_URL
if user_logged_in():
next_endpoint=request.args.get("next", "/")
flash(("You cannot register a new user while logged in. "
@@ -134,10 +135,9 @@ def register_user():
if request.method == "GET":
return render_ui("oauth2/register_user.html")
- config = app.config
form = request.form
response = requests.post(
- urljoin(config["GN_SERVER_URL"], "oauth2/user/register"),
+ urljoin(GN_SERVER_URL, "oauth2/user/register"),
data = {
"user_name": form.get("user_name"),
"email": form.get("email_address"),