aboutsummaryrefslogtreecommitdiff
path: root/gn2/wqflask/oauth2/checks.py
diff options
context:
space:
mode:
authorAlexander_Kabui2024-01-02 13:21:07 +0300
committerAlexander_Kabui2024-01-02 13:21:07 +0300
commit70c4201b332e0e2c0d958428086512f291469b87 (patch)
treeaea4fac8782c110fc233c589c3f0f7bd34bada6c /gn2/wqflask/oauth2/checks.py
parent5092eb42f062b1695c4e39619f0bd74a876cfac2 (diff)
parent965ce5114d585624d5edb082c710b83d83a3be40 (diff)
downloadgenenetwork2-70c4201b332e0e2c0d958428086512f291469b87.tar.gz
merge changes
Diffstat (limited to 'gn2/wqflask/oauth2/checks.py')
-rw-r--r--gn2/wqflask/oauth2/checks.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/gn2/wqflask/oauth2/checks.py b/gn2/wqflask/oauth2/checks.py
new file mode 100644
index 00000000..5d90f986
--- /dev/null
+++ b/gn2/wqflask/oauth2/checks.py
@@ -0,0 +1,49 @@
+"""Various checkers for OAuth2"""
+from functools import wraps
+from urllib.parse import urljoin
+
+from authlib.integrations.requests_client import OAuth2Session
+from flask import (
+ flash, request, url_for, redirect, current_app, session as flask_session)
+
+from . import session
+
+def user_logged_in():
+ """Check whether the user has logged in."""
+ suser = session.session_info()["user"]
+ if suser["logged_in"]:
+ if session.expired():
+ session.clear_session_info()
+ return False
+ return suser["token"].is_right()
+ return False
+
+def require_oauth2(func):
+ """Decorator for ensuring user is logged in."""
+ @wraps(func)
+ def __token_valid__(*args, **kwargs):
+ """Check that the user is logged in and their token is valid."""
+ config = current_app.config
+ def __clear_session__(_no_token):
+ session.clear_session_info()
+ flask_session.pop("oauth2_token", None)
+ flask_session.pop("user_details", None)
+ flash("You need to be logged in.", "alert-warning")
+ return redirect("/")
+
+ def __with_token__(token):
+ from gn2.utility.tools import (
+ AUTH_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET)
+ client = OAuth2Session(
+ OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET, token=token)
+ resp = client.get(
+ urljoin(AUTH_SERVER_URL, "auth/user/"))
+ user_details = resp.json()
+ if not user_details.get("error", False):
+ return func(*args, **kwargs)
+
+ return clear_session_info(token)
+
+ return session.user_token().either(__clear_session__, __with_token__)
+
+ return __token_valid__