aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/users
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/auth/authorisation/users')
-rw-r--r--gn3/auth/authorisation/users/admin/__init__.py2
-rw-r--r--gn3/auth/authorisation/users/admin/ui.py42
-rw-r--r--gn3/auth/authorisation/users/admin/views.py79
3 files changed, 123 insertions, 0 deletions
diff --git a/gn3/auth/authorisation/users/admin/__init__.py b/gn3/auth/authorisation/users/admin/__init__.py
new file mode 100644
index 0000000..8aa0743
--- /dev/null
+++ b/gn3/auth/authorisation/users/admin/__init__.py
@@ -0,0 +1,2 @@
+"""The admin module"""
+from .views import admin
diff --git a/gn3/auth/authorisation/users/admin/ui.py b/gn3/auth/authorisation/users/admin/ui.py
new file mode 100644
index 0000000..3918eb1
--- /dev/null
+++ b/gn3/auth/authorisation/users/admin/ui.py
@@ -0,0 +1,42 @@
+"""UI utilities for the auth system."""
+from functools import wraps
+from datetime import datetime, timezone
+from flask import flash, session, request, url_for, redirect
+
+from gn3.auth.authentication.users import User
+from gn3.auth.db_utils import with_db_connection
+from gn3.auth.authorisation.roles.models import user_roles
+
+SESSION_KEY = "session_details"
+
+def __session_expired__():
+ """Check whether the session has expired."""
+ return datetime.now(tz=timezone.utc) >= session[SESSION_KEY]["expires"]
+
+def logged_in(func):
+ """Verify the user is logged in."""
+ @wraps(func)
+ def __logged_in__(*args, **kwargs):
+ if bool(session.get(SESSION_KEY)) and not __session_expired__():
+ return func(*args, **kwargs)
+ flash("You need to be logged in to access that page.", "alert-danger")
+ return redirect(url_for("oauth2.admin.login", next=request.url))
+ return __logged_in__
+
+def is_admin(func):
+ """Verify user is a system admin."""
+ @wraps(func)
+ @logged_in
+ def __admin__(*args, **kwargs):
+ admin_roles = [
+ role for role in with_db_connection(
+ lambda conn: user_roles(
+ conn, User(**session[SESSION_KEY]["user"])))
+ if role.role_name == "system-administrator"]
+ if len(admin_roles) > 0:
+ return func(*args, **kwargs)
+ flash("Expected a system administrator.", "alert-danger")
+ flash("You have been logged out of the system.", "alert-info")
+ session.pop(SESSION_KEY)
+ return redirect(url_for("oauth2.admin.login"))
+ return __admin__
diff --git a/gn3/auth/authorisation/users/admin/views.py b/gn3/auth/authorisation/users/admin/views.py
new file mode 100644
index 0000000..6a86da3
--- /dev/null
+++ b/gn3/auth/authorisation/users/admin/views.py
@@ -0,0 +1,79 @@
+"""UI for admin stuff"""
+from datetime import datetime, timezone, timedelta
+
+from email_validator import validate_email, EmailNotValidError
+from flask import (
+ flash,
+ session,
+ request,
+ url_for,
+ redirect,
+ Blueprint,
+ current_app,
+ render_template)
+
+from gn3.auth import db
+from gn3.auth.authentication.users import valid_login, user_by_email
+
+from .ui import SESSION_KEY, is_admin
+
+admin = Blueprint("admin", __name__)
+
+@admin.before_request
+def update_expires():
+ """Update session expiration."""
+ if bool(session.get(SESSION_KEY)):
+ now = datetime.now(tz=timezone.utc)
+ if now >= session[SESSION_KEY]["expires"]:
+ flash("Session has expired. Logging out...", "alert-warning")
+ session.pop(SESSION_KEY)
+ return redirect("/version")
+ # If not expired, extend expiry.
+ session[SESSION_KEY]["expires"] = now + timedelta(minutes=10)
+
+ return None
+
+@admin.route("/login", methods=["GET", "POST"])
+def login():
+ """Log in to GN3 directly without OAuth2 client."""
+ if request.method == "GET":
+ return render_template(
+ "login.html", next_uri=request.args.get("next", "/api/version"))
+
+ form = request.form
+ next_uri = form.get("next_uri", "/api/version")
+ error_message = "Invalid email or password provided."
+ login_page = redirect(url_for("oauth2.admin.login", next=next_uri))
+ try:
+ email = validate_email(form.get("email", "").strip(),
+ check_deliverability=False)
+ password = form.get("password")
+ with db.connection(current_app.config["AUTH_DB"]) as conn:
+ user = user_by_email(conn, email["email"])
+ if valid_login(conn, user, password):
+ session[SESSION_KEY] = {
+ "user": user._asdict(),
+ "expires": datetime.now(tz=timezone.utc) + timedelta(minutes=10)
+ }
+ return redirect(next_uri)
+ flash(error_message, "alert-danger")
+ return login_page
+ except EmailNotValidError as _enve:
+ flash(error_message, "alert-danger")
+ return login_page
+
+@admin.route("/logout", methods=["GET"])
+def logout():
+ """Log out the admin."""
+ if not bool(session.get(SESSION_KEY)):
+ flash("Not logged in.", "alert-info")
+ return redirect(url_for("general.version"))
+ session.pop(SESSION_KEY)
+ flash("Logged out", "alert-success")
+ return redirect(url_for("general.version"))
+
+@admin.route("/register-client", methods=["GET", "POST"])
+@is_admin
+def register_client():
+ """Register an OAuth2 client."""
+ return "WOULD REGISTER ..."