about summary refs log tree commit diff
path: root/wqflask/wqflask/oauth2/routes.py
diff options
context:
space:
mode:
Diffstat (limited to 'wqflask/wqflask/oauth2/routes.py')
-rw-r--r--wqflask/wqflask/oauth2/routes.py270
1 files changed, 12 insertions, 258 deletions
diff --git a/wqflask/wqflask/oauth2/routes.py b/wqflask/wqflask/oauth2/routes.py
index 6fed4064..93219bce 100644
--- a/wqflask/wqflask/oauth2/routes.py
+++ b/wqflask/wqflask/oauth2/routes.py
@@ -1,262 +1,16 @@
 """Routes for the OAuth2 auth system in GN3"""
-import uuid
-import requests
-from typing import Optional
-from urllib.parse import urljoin
+from flask import Blueprint
 
-from pymonad.maybe import Just, Maybe, Nothing
-from pymonad.either import Left, Right, Either
-from authlib.integrations.requests_client import OAuth2Session
-from authlib.integrations.base_client.errors import OAuthError
-from flask import (
-    flash, request, session, url_for, redirect, Blueprint, render_template,
-    current_app as app)
+from .users import users
+from .roles import roles
+from .groups import groups
+from .toplevel import toplevel
+from .resources import resources
 
-from .checks import require_oauth2, user_logged_in
+oauth2 = Blueprint("oauth2", __name__, template_folder="templates/oauth2")
 
-oauth2 = Blueprint("oauth2", __name__)
-SCOPE = "profile group role resource register-client"
-
-def __raise_unimplemented__():
-    raise Exception("NOT IMPLEMENTED")
-
-def get_endpoint(uri_path: str) -> Maybe:
-    token = session.get("oauth2_token", False)
-    if token and not bool(session.get("user_details", False)):
-        config = app.config
-        client = OAuth2Session(
-            config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
-            token=token)
-        resp = client.get(
-            urljoin(config["GN_SERVER_URL"], uri_path))
-        resp_json = resp.json()
-
-        if resp_json.get("error") == "invalid_token":
-            flash(resp_json["error_description"], "alert-danger")
-            flash("You are now logged out.", "alert-info")
-            session.pop("oauth2_token", None)
-            return Nothing
-
-        return Just(resp_json)
-
-    return Nothing
-
-def __user_details__():
-    return session.get("user_details", False) or get_endpoint(
-        "oauth2/user").maybe(False, __id__)
-
-def oauth2_get(uri_path: str) -> Either:
-    token = session.get("oauth2_token")
-    config = app.config
-    client = OAuth2Session(
-        config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
-        token=token, scope=SCOPE)
-    resp = client.get(
-            urljoin(config["GN_SERVER_URL"], uri_path))
-    if resp.status_code == 200:
-        return Right(resp.json())
-
-    return Left(resp)
-
-def oauth2_post(uri_path: str, data: dict) -> Either:
-    token = session.get("oauth2_token")
-    config = app.config
-    client = OAuth2Session(
-        config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
-        token=token, scope=SCOPE)
-    resp = client.post(urljoin(config["GN_SERVER_URL"], uri_path), data=data)
-    if resp.status_code == 200:
-        return Right(resp.json())
-
-    return Left(resp)
-
-def __request_error__(response):
-    app.logger.error(f"{response}: {response.url} [{response.status_code}]")
-    return render_template("oauth2/request_error.html", response=response)
-
-@oauth2.route("/login", methods=["GET", "POST"])
-def login():
-    """Route to allow users to sign up."""
-    next_endpoint=request.args.get("next", False)
-
-    if request.method == "POST":
-        config = app.config
-        form = request.form
-        client = OAuth2Session(
-            config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
-            scope=SCOPE, token_endpoint_auth_method="client_secret_post")
-        try:
-            token = client.fetch_token(
-                urljoin(config["GN_SERVER_URL"], "oauth2/token"),
-                username=form.get("email_address"),
-                password=form.get("password"),
-                grant_type="password")
-            session["oauth2_token"] = token
-        except OAuthError as _oaerr:
-            flash(_oaerr.args[0], "alert-danger")
-            return render_template(
-                "oauth2/login.html", next_endpoint=next_endpoint,
-                email=form.get("email_address"))
-
-    if user_logged_in():
-        if next_endpoint:
-            return redirect(url_for(next_endpoint))
-        return redirect("/")
-
-    return render_template("oauth2/login.html", next_endpoint=next_endpoint)
-
-
-@oauth2.route("/logout", methods=["GET", "POST"])
-def logout():
-    if user_logged_in():
-        token = session.get("oauth2_token", False)
-        config = app.config
-        client = OAuth2Session(
-            config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
-            scope = SCOPE, token=token)
-        resp = client.revoke_token(urljoin(config["GN_SERVER_URL"], "oauth2/revoke"))
-        keys = tuple(key for key in session.keys() if not key.startswith("_"))
-        for key in keys:
-            session.pop(key, default=None)
-
-    return redirect("/")
-
-@oauth2.route("/register-user", methods=["GET", "POST"])
-def register_user():
-    if user_logged_in():
-        next_endpoint=request.args.get("next", url_for("/"))
-        flash(("You cannot register a new user while logged in. "
-               "Please logout to register a new user."),
-              "alert-danger")
-        return redirect(next_endpoint)
-
-    if request.method == "GET":
-        return render_template("oauth2/register_user.html")
-
-    config = app.config
-    form = request.form
-    response = requests.post(
-        urljoin(config["GN_SERVER_URL"], "oauth2/register-user"),
-        data = {
-            "user_name": form.get("user_name"),
-            "email": form.get("email_address"),
-            "password": form.get("password"),
-            "confirm_password": form.get("confirm_password")})
-    results = response.json()
-    if "error" in results:
-        error_messages = tuple(
-            f"{results['error']}: {msg.strip()}"
-            for msg in results.get("error_description").split("::"))
-        for message in error_messages:
-            flash(message, "alert-danger")
-        return redirect(url_for("oauth2.register_user"))
-
-    flash("Registration successful! Please login to continue.", "alert-success")
-    return redirect(url_for("oauth2.login"))
-
-@oauth2.route("/register-client", methods=["GET", "POST"])
-@require_oauth2
-def register_client():
-    """Register an OAuth2 client."""
-    return "USER IS LOGGED IN AND SUCCESSFULLY ACCESSED THIS ENDPOINT!"
-
-@oauth2.route("/user-profile", methods=["GET"])
-@require_oauth2
-def user_profile():
-    __id__ = lambda the_val: the_val
-    user_details = __user_details__()
-    config = app.config
-    client = OAuth2Session(
-        config["OAUTH2_CLIENT_ID"], config["OAUTH2_CLIENT_SECRET"],
-        scope = SCOPE, token=session.get("oauth2_token"))
-
-    roles = oauth2_get("oauth2/user-roles").either(lambda x: "Error", lambda x: x)
-    return render_template(
-        "oauth2/view-user.html", user_details=user_details, roles=roles)
-
-@oauth2.route("/request-add-to-group", methods=["POST"])
-@require_oauth2
-def request_add_to_group():
-    return "WOULD SEND MESSAGE TO HAVE YOU ADDED TO GROUP..."
-
-def __handle_error__(redirect_uri: Optional[str] = None, **kwargs):
-    def __handler__(error):
-        print(f"ERROR: {error}")
-        msg = error.get(
-            "error_message", error.get("error_description", "undefined error"))
-        flash(f"{error['error']}: {msg}.",
-              "alert-danger")
-        if "response_handlers" in kwargs:
-            for handler in kwargs["response_handlers"]:
-                handler(response)
-        if redirect:
-            return redirect(url_for(redirect_uri, **kwargs))
-
-    return __handler__
-
-def __handle_success__(
-        success_msg: str, redirect_uri: Optional[str] = None, **kwargs):
-    def __handler__(response):
-        flash(f"Success: {success_msg}.", "alert-success")
-        if "response_handlers" in kwargs:
-            for handler in kwargs["response_handlers"]:
-                handler(response)
-        if redirect:
-            return redirect(url_for(redirect_uri, **kwargs))
-
-    return __handler__
-
-@oauth2.route("/create-group", methods=["POST"])
-@require_oauth2
-def create_group():
-    def __setup_group__(response):
-        session["user_details"]["group"] = response
-
-    resp = oauth2_post("oauth2/create-group", data=dict(request.form))
-    return resp.either(
-        __handle_error__("oauth2.group_join_or_create"),
-        __handle_success__(
-            "Created group", "oauth2.user_profile",
-            response_handlers=__setup_group__))
-
-@oauth2.route("/group-join-or-create", methods=["GET"])
-def group_join_or_create():
-    user_details = __user_details__()
-    if bool(user_details["group"]):
-        flash("You are already a member of a group.", "alert info.")
-        return redirect(url_for("oauth2.user_profile"))
-    groups = oauth2_get("oauth2/groups").either(
-        lambda x: __raise_unimplemented__(), lambda x: x)
-    return render_template("oauth2/group_join_or_create.html", groups=groups)
-
-@oauth2.route("/user-resources", methods=["GET"])
-def user_resources():
-    def __success__(resources):
-        return render_template("oauth2/resources.html", resources=resources)
-
-    return oauth2_get("oauth2/user-resources").either(
-        __request_error__, __success__)
-
-@oauth2.route("/user-roles", methods=["GET"])
-def user_roles():
-    def __success__(roles):
-        return render_template("oauth2/list_roles.html", roles=roles)
-
-    return oauth2_get("oauth2/user-roles").either(
-        __request_error__, __success__)
-
-@oauth2.route("/user-group", methods=["GET"])
-def user_group():
-    def __success__(group):
-        return render_template("oauth2/group.html", group=group)
-
-    return oauth2_get("oauth2/user-group").either(
-        __request_error__, __success__)
-
-@oauth2.route("/role/<uuid:role_id>", methods=["GET"])
-def role(role_id: uuid.UUID):
-    def __success__(the_role):
-        return render_template("oauth2/role.html", role=the_role)
-
-    return oauth2_get(f"oauth2/role/{role_id}").either(
-        __request_error__, __success__)
+oauth2.register_blueprint(toplevel, url_prefix="/")
+oauth2.register_blueprint(users, url_prefix="/user")
+oauth2.register_blueprint(roles, url_prefix="/role")
+oauth2.register_blueprint(groups, url_prefix="/group")
+oauth2.register_blueprint(resources, url_prefix="/resource")