about summary refs log tree commit diff
path: root/gn_auth/auth/authorisation/privileges
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/privileges')
-rw-r--r--gn_auth/auth/authorisation/privileges/__init__.py5
-rw-r--r--gn_auth/auth/authorisation/privileges/models.py70
-rw-r--r--gn_auth/auth/authorisation/privileges/views.py32
3 files changed, 107 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/privileges/__init__.py b/gn_auth/auth/authorisation/privileges/__init__.py
new file mode 100644
index 0000000..18c7f5d
--- /dev/null
+++ b/gn_auth/auth/authorisation/privileges/__init__.py
@@ -0,0 +1,5 @@
+"""Package for Privileges."""
+from .models import (Privilege,
+                     user_privileges,
+                     privileges_by_ids,
+                     db_row_to_privilege)
diff --git a/gn_auth/auth/authorisation/privileges/models.py b/gn_auth/auth/authorisation/privileges/models.py
new file mode 100644
index 0000000..77be7c0
--- /dev/null
+++ b/gn_auth/auth/authorisation/privileges/models.py
@@ -0,0 +1,70 @@
+"""Handle privileges"""
+from dataclasses import dataclass
+from typing import Iterable, Optional
+
+import sqlite3
+
+from gn_auth.auth.db import sqlite3 as db
+from gn_auth.auth.authentication.users import User
+
+
+@dataclass(frozen=True)
+class Privilege:
+    """Class representing a privilege: creates immutable objects."""
+    privilege_id: str
+    privilege_description: str
+
+
+def db_row_to_privilege(row: sqlite3.Row) -> Privilege:
+    "Convert single db row into a privilege object."
+    return Privilege(privilege_id=row["privilege_id"],
+                     privilege_description=row["privilege_description"])
+
+
+def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]:
+    """Fetch the user's privileges from the database."""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            ("SELECT p.privilege_id, p.privilege_description "
+             "FROM user_roles AS ur "
+             "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id "
+             "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+             "WHERE ur.user_id=?"),
+            (str(user.user_id),))
+        results = cursor.fetchall()
+
+    return (Privilege(row[0], row[1]) for row in results)
+
+def privileges_by_ids(
+        conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[
+            Privilege, ...]:
+    """Fetch privileges by their ids."""
+    if len(privileges_ids) == 0:
+        return tuple()
+
+    with db.cursor(conn) as cursor:
+        clause = ", ".join(["?"] * len(privileges_ids))
+        cursor.execute(
+            f"SELECT * FROM privileges WHERE privilege_id IN ({clause})",
+            privileges_ids)
+        return tuple(
+            Privilege(row["privilege_id"], row["privilege_description"])
+            for row in cursor.fetchall())
+
+def all_privileges(conn: db.DbConnection) -> tuple[Privilege, ...]:
+    """Retrieve all privileges from the database."""
+    with db.cursor(conn) as cursor:
+        cursor.execute("SELECT * FROM privileges")
+        results = cursor.fetchall()
+
+    return tuple([] if not bool(results)
+                 else (db_row_to_privilege(row) for row in results))
+
+def privilege_by_id(conn: db.DbConnection, privilege_id: str) -> Optional[Privilege]:
+    """Retrieve a privilege by its ID."""
+    with db.cursor(conn) as cursor:
+        cursor.execute("SELECT * FROM privileges WHERE privilege_id=?",
+                       (privilege_id,))
+        row = cursor.fetchone()
+
+    return db_row_to_privilege(row) if bool(row) else None
diff --git a/gn_auth/auth/authorisation/privileges/views.py b/gn_auth/auth/authorisation/privileges/views.py
new file mode 100644
index 0000000..d50e5cb
--- /dev/null
+++ b/gn_auth/auth/authorisation/privileges/views.py
@@ -0,0 +1,32 @@
+"""Routes for privileges."""
+from dataclasses import asdict
+
+from werkzeug.exceptions import NotFound
+from flask import jsonify, Blueprint, current_app as app
+
+from gn_auth.auth.db import sqlite3 as db
+
+from .models import all_privileges, privilege_by_id
+
+privileges = Blueprint("privileges", __name__)
+
+@privileges.route("/", methods=["GET"])
+@privileges.route("/list", methods=["GET"])
+def list_privileges():
+    """List all the available privileges."""
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        _privileges = all_privileges(conn)
+
+    return jsonify(_privileges if bool(_privileges) else []), 200
+
+@privileges.route("/<privilege_id>/", methods=["GET"])
+@privileges.route("/<privilege_id>/view", methods=["GET"])
+def view_privilege(privilege_id: str):
+    """View details of a single privilege"""
+    with db.connection(app.config["AUTH_DB"]) as conn:
+        _privilege = privilege_by_id(conn, privilege_id)
+
+    if bool(_privilege):
+        return jsonify(_privilege)
+
+    raise NotFound(f"No privilege exists with ID '{privilege_id}'")