about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/users/admin
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-08-04 10:10:28 +0300
committerFrederick Muriuki Muriithi2023-08-04 10:20:09 +0300
commit8b7c598407a5fea9a3d78473e72df87606998cd4 (patch)
tree8526433a17eca6b511feb082a0574f9b15cb9469 /gn_auth/auth/authorisation/users/admin
parentf7fcbbcc014686ac597b783a8dcb38b43024b9d6 (diff)
downloadgn-auth-8b7c598407a5fea9a3d78473e72df87606998cd4.tar.gz
Copy over files from GN3 repository.
Diffstat (limited to 'gn_auth/auth/authorisation/users/admin')
-rw-r--r--gn_auth/auth/authorisation/users/admin/__init__.py2
-rw-r--r--gn_auth/auth/authorisation/users/admin/ui.py27
-rw-r--r--gn_auth/auth/authorisation/users/admin/views.py230
3 files changed, 259 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/users/admin/__init__.py b/gn_auth/auth/authorisation/users/admin/__init__.py
new file mode 100644
index 0000000..8aa0743
--- /dev/null
+++ b/gn_auth/auth/authorisation/users/admin/__init__.py
@@ -0,0 +1,2 @@
+"""The admin module"""
+from .views import admin
diff --git a/gn_auth/auth/authorisation/users/admin/ui.py b/gn_auth/auth/authorisation/users/admin/ui.py
new file mode 100644
index 0000000..242c7a6
--- /dev/null
+++ b/gn_auth/auth/authorisation/users/admin/ui.py
@@ -0,0 +1,27 @@
+"""UI utilities for the auth system."""
+from functools import wraps
+from flask import flash, url_for, redirect
+
+from gn3.auth.authentication.users import User
+from gn3.auth.db_utils import with_db_connection
+from gn3.auth.authorisation.roles.models import user_roles
+
+from gn3.session import logged_in, session_user, clear_session_info
+
+def is_admin(func):
+    """Verify user is a system admin."""
+    @wraps(func)
+    @logged_in
+    def __admin__(*args, **kwargs):
+        admin_roles = [
+            role for role in with_db_connection(
+                lambda conn: user_roles(
+                    conn, User(**session_user())))
+            if role.role_name == "system-administrator"]
+        if len(admin_roles) > 0:
+            return func(*args, **kwargs)
+        flash("Expected a system administrator.", "alert-danger")
+        flash("You have been logged out of the system.", "alert-info")
+        clear_session_info()
+        return redirect(url_for("oauth2.admin.login"))
+    return __admin__
diff --git a/gn_auth/auth/authorisation/users/admin/views.py b/gn_auth/auth/authorisation/users/admin/views.py
new file mode 100644
index 0000000..c9f1887
--- /dev/null
+++ b/gn_auth/auth/authorisation/users/admin/views.py
@@ -0,0 +1,230 @@
+"""UI for admin stuff"""
+import uuid
+import json
+import random
+import string
+from functools import partial
+from datetime import datetime, timezone, timedelta
+
+from email_validator import validate_email, EmailNotValidError
+from flask import (
+    flash,
+    request,
+    url_for,
+    redirect,
+    Blueprint,
+    current_app,
+    render_template)
+
+
+from gn3 import session
+from gn3.auth import db
+from gn3.auth.db_utils import with_db_connection
+
+from gn3.auth.authentication.oauth2.models.oauth2client import (
+    save_client,
+    OAuth2Client,
+    oauth2_clients,
+    client as oauth2_client,
+    delete_client as _delete_client)
+from gn3.auth.authentication.users import (
+    User,
+    user_by_id,
+    valid_login,
+    user_by_email,
+    hash_password)
+
+from .ui import is_admin
+
+admin = Blueprint("admin", __name__)
+
+@admin.before_request
+def update_expires():
+    """Update session expiration."""
+    if session.session_info() and not session.update_expiry():
+        flash("Session has expired. Logging out...", "alert-warning")
+        session.clear_session_info()
+        return redirect(url_for("oauth2.admin.login"))
+    return None
+
+@admin.route("/dashboard", methods=["GET"])
+@is_admin
+def dashboard():
+    """Admin dashboard."""
+    return render_template("admin/dashboard.html")
+
+@admin.route("/login", methods=["GET", "POST"])
+def login():
+    """Log in to GN3 directly without OAuth2 client."""
+    if request.method == "GET":
+        return render_template(
+            "admin/login.html",
+            next_uri=request.args.get("next", "oauth2.admin.dashboard"))
+
+    form = request.form
+    next_uri = form.get("next_uri", "oauth2.admin.dashboard")
+    error_message = "Invalid email or password provided."
+    login_page = redirect(url_for("oauth2.admin.login", next=next_uri))
+    try:
+        email = validate_email(form.get("email", "").strip(),
+                               check_deliverability=False)
+        password = form.get("password")
+        with db.connection(current_app.config["AUTH_DB"]) as conn:
+            user = user_by_email(conn, email["email"])
+            if valid_login(conn, user, password):
+                session.update_session_info(
+                    user=user._asdict(),
+                    expires=(
+                        datetime.now(tz=timezone.utc) + timedelta(minutes=10)))
+                return redirect(url_for(next_uri))
+            flash(error_message, "alert-danger")
+            return login_page
+    except EmailNotValidError as _enve:
+        flash(error_message, "alert-danger")
+        return login_page
+
+@admin.route("/logout", methods=["GET"])
+def logout():
+    """Log out the admin."""
+    if not session.session_info():
+        flash("Not logged in.", "alert-info")
+        return redirect(url_for("oauth2.admin.login"))
+    session.clear_session_info()
+    flash("Logged out", "alert-success")
+    return redirect(url_for("oauth2.admin.login"))
+
+def random_string(length: int = 64) -> str:
+    """Generate a random string."""
+    return "".join(
+        random.choice(string.ascii_letters + string.digits + string.punctuation)
+        for _idx in range(0, length))
+
+def __response_types__(grant_types: tuple[str, ...]) -> tuple[str, ...]:
+    """Compute response types from grant types."""
+    resps = {
+        "password": ("token",),
+        "authorization_code": ("token", "code"),
+        "refresh_token": ("token",)
+    }
+    return tuple(set(
+        resp_typ for types_list
+        in (types for grant, types in resps.items() if grant in grant_types)
+        for resp_typ in types_list))
+
+@admin.route("/register-client", methods=["GET", "POST"])
+@is_admin
+def register_client():
+    """Register an OAuth2 client."""
+    def __list_users__(conn):
+        with db.cursor(conn) as cursor:
+            cursor.execute("SELECT * FROM users")
+            return tuple(
+                User(uuid.UUID(row["user_id"]), row["email"], row["name"])
+                for row in cursor.fetchall())
+    if request.method == "GET":
+        return render_template(
+            "admin/register-client.html",
+            scope=current_app.config["OAUTH2_SCOPE"],
+            users=with_db_connection(__list_users__),
+            current_user=session.session_user())
+
+    form = request.form
+    raw_client_secret = random_string()
+    default_redirect_uri = form["redirect_uri"]
+    grant_types = form.getlist("grants[]")
+    client = OAuth2Client(
+        client_id = uuid.uuid4(),
+        client_secret = hash_password(raw_client_secret),
+        client_id_issued_at = datetime.now(tz=timezone.utc),
+        client_secret_expires_at = datetime.fromtimestamp(0),
+        client_metadata = {
+            "client_name": "GN2 Dev Server",
+            "token_endpoint_auth_method": [
+                "client_secret_post", "client_secret_basic"],
+            "client_type": "confidential",
+            "grant_types": ["password", "authorization_code", "refresh_token"],
+            "default_redirect_uri": default_redirect_uri,
+            "redirect_uris": [default_redirect_uri] + form.get("other_redirect_uri", "").split(),
+            "response_type": __response_types__(tuple(grant_types)),
+            "scope": form.getlist("scope[]")
+        },
+        user = with_db_connection(partial(
+            user_by_id, user_id=uuid.UUID(form["user"])))
+    )
+    client = with_db_connection(partial(save_client, the_client=client))
+    return render_template(
+        "admin/registered-client.html",
+        client=client,
+        client_secret = raw_client_secret)
+
+def __parse_client__(sqlite3_row) -> dict:
+    """Parse the client details into python datatypes."""
+    return {
+        **dict(sqlite3_row),
+        "client_metadata": json.loads(sqlite3_row["client_metadata"])
+    }
+
+@admin.route("/list-client", methods=["GET"])
+@is_admin
+def list_clients():
+    """List all registered OAuth2 clients."""
+    return render_template(
+        "admin/list-oauth2-clients.html",
+        clients=with_db_connection(oauth2_clients))
+
+@admin.route("/view-client/<uuid:client_id>", methods=["GET"])
+@is_admin
+def view_client(client_id: uuid.UUID):
+    """View details of OAuth2 client with given `client_id`."""
+    return render_template(
+        "admin/view-oauth2-client.html",
+        client=with_db_connection(partial(oauth2_client, client_id=client_id)),
+        scope=current_app.config["OAUTH2_SCOPE"])
+
+@admin.route("/edit-client", methods=["POST"])
+@is_admin
+def edit_client():
+    """Edit the details of the given client."""
+    form = request.form
+    the_client = with_db_connection(partial(
+        oauth2_client, client_id=uuid.UUID(form["client_id"])))
+    if the_client.is_nothing():
+        flash("No such client.", "alert-danger")
+        return redirect(url_for("oauth2.admin.list_clients"))
+    the_client = the_client.value
+    client_metadata = {
+        **the_client.client_metadata,
+        "default_redirect_uri": form["default_redirect_uri"],
+        "redirect_uris": list(set(
+            [form["default_redirect_uri"]] +
+            form["other_redirect_uris"].split("\r\n"))),
+        "grants": form.getlist("grants[]"),
+        "scope": form.getlist("scope[]")
+    }
+    with_db_connection(partial(save_client, the_client=OAuth2Client(
+        the_client.client_id,
+        the_client.client_secret,
+        the_client.client_id_issued_at,
+        the_client.client_secret_expires_at,
+        client_metadata,
+        the_client.user)))
+    flash("Client updated.", "alert-success")
+    return redirect(url_for("oauth2.admin.view_client",
+                            client_id=the_client.client_id))
+
+@admin.route("/delete-client", methods=["POST"])
+@is_admin
+def delete_client():
+    """Delete the details of the client."""
+    form = request.form
+    the_client = with_db_connection(partial(
+        oauth2_client, client_id=uuid.UUID(form["client_id"])))
+    if the_client.is_nothing():
+        flash("No such client.", "alert-danger")
+        return redirect(url_for("oauth2.admin.list_clients"))
+    the_client = the_client.value
+    with_db_connection(partial(_delete_client, client=the_client))
+    flash((f"Client '{the_client.client_metadata.client_name}' was deleted "
+           "successfully."),
+          "alert-success")
+    return redirect(url_for("oauth2.admin.list_clients"))