aboutsummaryrefslogtreecommitdiff
path: root/uploader/oauth2
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/oauth2')
-rw-r--r--uploader/oauth2/client.py23
-rw-r--r--uploader/oauth2/views.py20
2 files changed, 28 insertions, 15 deletions
diff --git a/uploader/oauth2/client.py b/uploader/oauth2/client.py
index 70a32ff..12fbf80 100644
--- a/uploader/oauth2/client.py
+++ b/uploader/oauth2/client.py
@@ -1,6 +1,7 @@
"""OAuth2 client utilities."""
import json
import time
+import uuid
import random
from datetime import datetime, timedelta
from urllib.parse import urljoin, urlparse
@@ -112,7 +113,8 @@ def oauth2_client():
try:
jwt = JsonWebToken(["RS256"]).decode(
token["access_token"], key=jwk)
- return datetime.now().timestamp() > jwt["exp"]
+ if bool(jwt.get("exp")):
+ return datetime.now().timestamp() > jwt["exp"]
except BadSignatureError as _bse:
pass
@@ -145,9 +147,24 @@ def oauth2_client():
__client__)
+def fetch_user_details() -> Either:
+ """Retrieve user details from the auth server"""
+ suser = session.session_info()["user"]
+ if suser["email"] == "anon@ymous.user":
+ udets = oauth2_get("auth/user/").then(
+ lambda usrdets: session.set_user_details({
+ "user_id": uuid.UUID(usrdets["user_id"]),
+ "name": usrdets["name"],
+ "email": usrdets["email"],
+ "token": session.user_token()}))
+ return udets
+ return Right(suser)
+
+
def user_logged_in():
"""Check whether the user has logged in."""
suser = session.session_info()["user"]
+ fetch_user_details()
return suser["logged_in"] and suser["token"].is_right()
@@ -191,7 +208,7 @@ def oauth2_get(url, **kwargs) -> Either:
return Right(resp.json())
return Left(resp)
except Exception as exc:#pylint: disable=[broad-except]
- app.logger.error("Error retriving data from auth server: (GET %s)",
+ app.logger.error("Error retrieving data from auth server: (GET %s)",
_uri,
exc_info=True)
return Left(exc)
@@ -223,7 +240,7 @@ def oauth2_post(url, data=None, json=None, **kwargs):#pylint: disable=[redefined
return Right(resp.json())
return Left(resp)
except Exception as exc:#pylint: disable=[broad-except]
- app.logger.error("Error retriving data from auth server: (POST %s)",
+ app.logger.error("Error retrieving data from auth server: (POST %s)",
_uri,
exc_info=True)
return Left(exc)
diff --git a/uploader/oauth2/views.py b/uploader/oauth2/views.py
index 61037f3..db4ef61 100644
--- a/uploader/oauth2/views.py
+++ b/uploader/oauth2/views.py
@@ -24,22 +24,24 @@ from .client import (
user_logged_in,
authserver_uri,
oauth2_clientid,
+ fetch_user_details,
oauth2_clientsecret)
oauth2 = Blueprint("oauth2", __name__)
+
@oauth2.route("/code")
def authorisation_code():
"""Receive authorisation code from auth server and use it to get token."""
def __process_error__(resp_or_exception):
app.logger.debug("ERROR: (%s)", resp_or_exception)
flash("There was an error retrieving the authorisation token.",
- "alert-danger")
+ "alert alert-danger")
return redirect("/")
def __fail_set_user_details__(_failure):
app.logger.debug("Fetching user details fails: %s", _failure)
- flash("Could not retrieve the user details", "alert-danger")
+ flash("Could not retrieve the user details", "alert alert-danger")
return redirect("/")
def __success_set_user_details__(_success):
@@ -48,19 +50,13 @@ def authorisation_code():
def __success__(token):
session.set_user_token(token)
- return oauth2_get("auth/user/").then(
- lambda usrdets: session.set_user_details({
- "user_id": uuid.UUID(usrdets["user_id"]),
- "name": usrdets["name"],
- "email": usrdets["email"],
- "token": session.user_token(),
- "logged_in": True})).either(
+ return fetch_user_details().either(
__fail_set_user_details__,
__success_set_user_details__)
code = request.args.get("code", "").strip()
if not bool(code):
- flash("AuthorisationError: No code was provided.", "alert-danger")
+ flash("AuthorisationError: No code was provided.", "alert alert-danger")
return redirect("/")
baseurl = urlparse(request.base_url, scheme=request.scheme)
@@ -116,7 +112,7 @@ def logout():
_user = session_info["user"]
_user_str = f"{_user['name']} ({_user['email']})"
session.clear_session_info()
- flash("Successfully logged out.", "alert-success")
+ flash("Successfully signed out.", "alert alert-success")
return redirect("/")
if user_logged_in():
@@ -134,5 +130,5 @@ def logout():
cleanup_thunk=lambda: __unset_session__(
session.session_info())),
lambda res: __unset_session__(session.session_info()))
- flash("There is no user that is currently logged in.", "alert-info")
+ flash("There is no user that is currently logged in.", "alert alert-info")
return redirect("/")