diff options
Diffstat (limited to 'gn_auth/auth/authorisation/privileges')
-rw-r--r-- | gn_auth/auth/authorisation/privileges/__init__.py | 5 | ||||
-rw-r--r-- | gn_auth/auth/authorisation/privileges/models.py | 70 | ||||
-rw-r--r-- | gn_auth/auth/authorisation/privileges/views.py | 32 |
3 files changed, 107 insertions, 0 deletions
diff --git a/gn_auth/auth/authorisation/privileges/__init__.py b/gn_auth/auth/authorisation/privileges/__init__.py new file mode 100644 index 0000000..18c7f5d --- /dev/null +++ b/gn_auth/auth/authorisation/privileges/__init__.py @@ -0,0 +1,5 @@ +"""Package for Privileges.""" +from .models import (Privilege, + user_privileges, + privileges_by_ids, + db_row_to_privilege) diff --git a/gn_auth/auth/authorisation/privileges/models.py b/gn_auth/auth/authorisation/privileges/models.py new file mode 100644 index 0000000..77be7c0 --- /dev/null +++ b/gn_auth/auth/authorisation/privileges/models.py @@ -0,0 +1,70 @@ +"""Handle privileges""" +from dataclasses import dataclass +from typing import Iterable, Optional + +import sqlite3 + +from gn_auth.auth.db import sqlite3 as db +from gn_auth.auth.authentication.users import User + + +@dataclass(frozen=True) +class Privilege: + """Class representing a privilege: creates immutable objects.""" + privilege_id: str + privilege_description: str + + +def db_row_to_privilege(row: sqlite3.Row) -> Privilege: + "Convert single db row into a privilege object." + return Privilege(privilege_id=row["privilege_id"], + privilege_description=row["privilege_description"]) + + +def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]: + """Fetch the user's privileges from the database.""" + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT p.privilege_id, p.privilege_description " + "FROM user_roles AS ur " + "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=?"), + (str(user.user_id),)) + results = cursor.fetchall() + + return (Privilege(row[0], row[1]) for row in results) + +def privileges_by_ids( + conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[ + Privilege, ...]: + """Fetch privileges by their ids.""" + if len(privileges_ids) == 0: + return tuple() + + with db.cursor(conn) as cursor: + clause = ", ".join(["?"] * len(privileges_ids)) + cursor.execute( + f"SELECT * FROM privileges WHERE privilege_id IN ({clause})", + privileges_ids) + return tuple( + Privilege(row["privilege_id"], row["privilege_description"]) + for row in cursor.fetchall()) + +def all_privileges(conn: db.DbConnection) -> tuple[Privilege, ...]: + """Retrieve all privileges from the database.""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM privileges") + results = cursor.fetchall() + + return tuple([] if not bool(results) + else (db_row_to_privilege(row) for row in results)) + +def privilege_by_id(conn: db.DbConnection, privilege_id: str) -> Optional[Privilege]: + """Retrieve a privilege by its ID.""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM privileges WHERE privilege_id=?", + (privilege_id,)) + row = cursor.fetchone() + + return db_row_to_privilege(row) if bool(row) else None diff --git a/gn_auth/auth/authorisation/privileges/views.py b/gn_auth/auth/authorisation/privileges/views.py new file mode 100644 index 0000000..d50e5cb --- /dev/null +++ b/gn_auth/auth/authorisation/privileges/views.py @@ -0,0 +1,32 @@ +"""Routes for privileges.""" +from dataclasses import asdict + +from werkzeug.exceptions import NotFound +from flask import jsonify, Blueprint, current_app as app + +from gn_auth.auth.db import sqlite3 as db + +from .models import all_privileges, privilege_by_id + +privileges = Blueprint("privileges", __name__) + +@privileges.route("/", methods=["GET"]) +@privileges.route("/list", methods=["GET"]) +def list_privileges(): + """List all the available privileges.""" + with db.connection(app.config["AUTH_DB"]) as conn: + _privileges = all_privileges(conn) + + return jsonify(_privileges if bool(_privileges) else []), 200 + +@privileges.route("/<privilege_id>/", methods=["GET"]) +@privileges.route("/<privilege_id>/view", methods=["GET"]) +def view_privilege(privilege_id: str): + """View details of a single privilege""" + with db.connection(app.config["AUTH_DB"]) as conn: + _privilege = privilege_by_id(conn, privilege_id) + + if bool(_privilege): + return jsonify(_privilege) + + raise NotFound(f"No privilege exists with ID '{privilege_id}'") |