diff options
author | Alexander_Kabui | 2024-01-02 13:21:07 +0300 |
---|---|---|
committer | Alexander_Kabui | 2024-01-02 13:21:07 +0300 |
commit | 70c4201b332e0e2c0d958428086512f291469b87 (patch) | |
tree | aea4fac8782c110fc233c589c3f0f7bd34bada6c /gn2/wqflask/oauth2/checks.py | |
parent | 5092eb42f062b1695c4e39619f0bd74a876cfac2 (diff) | |
parent | 965ce5114d585624d5edb082c710b83d83a3be40 (diff) | |
download | genenetwork2-70c4201b332e0e2c0d958428086512f291469b87.tar.gz |
merge changes
Diffstat (limited to 'gn2/wqflask/oauth2/checks.py')
-rw-r--r-- | gn2/wqflask/oauth2/checks.py | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/gn2/wqflask/oauth2/checks.py b/gn2/wqflask/oauth2/checks.py new file mode 100644 index 00000000..5d90f986 --- /dev/null +++ b/gn2/wqflask/oauth2/checks.py @@ -0,0 +1,49 @@ +"""Various checkers for OAuth2""" +from functools import wraps +from urllib.parse import urljoin + +from authlib.integrations.requests_client import OAuth2Session +from flask import ( + flash, request, url_for, redirect, current_app, session as flask_session) + +from . import session + +def user_logged_in(): + """Check whether the user has logged in.""" + suser = session.session_info()["user"] + if suser["logged_in"]: + if session.expired(): + session.clear_session_info() + return False + return suser["token"].is_right() + return False + +def require_oauth2(func): + """Decorator for ensuring user is logged in.""" + @wraps(func) + def __token_valid__(*args, **kwargs): + """Check that the user is logged in and their token is valid.""" + config = current_app.config + def __clear_session__(_no_token): + session.clear_session_info() + flask_session.pop("oauth2_token", None) + flask_session.pop("user_details", None) + flash("You need to be logged in.", "alert-warning") + return redirect("/") + + def __with_token__(token): + from gn2.utility.tools import ( + AUTH_SERVER_URL, OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET) + client = OAuth2Session( + OAUTH2_CLIENT_ID, OAUTH2_CLIENT_SECRET, token=token) + resp = client.get( + urljoin(AUTH_SERVER_URL, "auth/user/")) + user_details = resp.json() + if not user_details.get("error", False): + return func(*args, **kwargs) + + return clear_session_info(token) + + return session.user_token().either(__clear_session__, __with_token__) + + return __token_valid__ |