From fa7452e4f08352526257d1f0dc5181302a55d3e2 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 10 Jun 2024 15:22:20 -0500 Subject: Provide some endpoints for privileges. --- gn_auth/auth/authorisation/privileges.py | 52 ----------------- gn_auth/auth/authorisation/privileges/__init__.py | 5 ++ gn_auth/auth/authorisation/privileges/models.py | 70 +++++++++++++++++++++++ gn_auth/auth/authorisation/privileges/views.py | 32 +++++++++++ gn_auth/auth/views.py | 2 + 5 files changed, 109 insertions(+), 52 deletions(-) delete mode 100644 gn_auth/auth/authorisation/privileges.py create mode 100644 gn_auth/auth/authorisation/privileges/__init__.py create mode 100644 gn_auth/auth/authorisation/privileges/models.py create mode 100644 gn_auth/auth/authorisation/privileges/views.py diff --git a/gn_auth/auth/authorisation/privileges.py b/gn_auth/auth/authorisation/privileges.py deleted file mode 100644 index a977db5..0000000 --- a/gn_auth/auth/authorisation/privileges.py +++ /dev/null @@ -1,52 +0,0 @@ -"""Handle privileges""" -from dataclasses import dataclass -from typing import Iterable - -import sqlite3 - -from ..db import sqlite3 as db -from ..authentication.users import User - - -@dataclass(frozen=True) -class Privilege: - """Class representing a privilege: creates immutable objects.""" - privilege_id: str - privilege_description: str - - -def db_row_to_privilege(row: sqlite3.Row) -> Privilege: - "Convert single db row into a privilege object." - return Privilege(privilege_id=row["privilege_id"], - privilege_description=row["privilege_description"]) - - -def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]: - """Fetch the user's privileges from the database.""" - with db.cursor(conn) as cursor: - cursor.execute( - ("SELECT p.privilege_id, p.privilege_description " - "FROM user_roles AS ur " - "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id " - "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " - "WHERE ur.user_id=?"), - (str(user.user_id),)) - results = cursor.fetchall() - - return (Privilege(row[0], row[1]) for row in results) - -def privileges_by_ids( - conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[ - Privilege, ...]: - """Fetch privileges by their ids.""" - if len(privileges_ids) == 0: - return tuple() - - with db.cursor(conn) as cursor: - clause = ", ".join(["?"] * len(privileges_ids)) - cursor.execute( - f"SELECT * FROM privileges WHERE privilege_id IN ({clause})", - privileges_ids) - return tuple( - Privilege(row["privilege_id"], row["privilege_description"]) - for row in cursor.fetchall()) diff --git a/gn_auth/auth/authorisation/privileges/__init__.py b/gn_auth/auth/authorisation/privileges/__init__.py new file mode 100644 index 0000000..18c7f5d --- /dev/null +++ b/gn_auth/auth/authorisation/privileges/__init__.py @@ -0,0 +1,5 @@ +"""Package for Privileges.""" +from .models import (Privilege, + user_privileges, + privileges_by_ids, + db_row_to_privilege) diff --git a/gn_auth/auth/authorisation/privileges/models.py b/gn_auth/auth/authorisation/privileges/models.py new file mode 100644 index 0000000..77be7c0 --- /dev/null +++ b/gn_auth/auth/authorisation/privileges/models.py @@ -0,0 +1,70 @@ +"""Handle privileges""" +from dataclasses import dataclass +from typing import Iterable, Optional + +import sqlite3 + +from gn_auth.auth.db import sqlite3 as db +from gn_auth.auth.authentication.users import User + + +@dataclass(frozen=True) +class Privilege: + """Class representing a privilege: creates immutable objects.""" + privilege_id: str + privilege_description: str + + +def db_row_to_privilege(row: sqlite3.Row) -> Privilege: + "Convert single db row into a privilege object." + return Privilege(privilege_id=row["privilege_id"], + privilege_description=row["privilege_description"]) + + +def user_privileges(conn: db.DbConnection, user: User) -> Iterable[Privilege]: + """Fetch the user's privileges from the database.""" + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT p.privilege_id, p.privilege_description " + "FROM user_roles AS ur " + "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=?"), + (str(user.user_id),)) + results = cursor.fetchall() + + return (Privilege(row[0], row[1]) for row in results) + +def privileges_by_ids( + conn: db.DbConnection, privileges_ids: tuple[str, ...]) -> tuple[ + Privilege, ...]: + """Fetch privileges by their ids.""" + if len(privileges_ids) == 0: + return tuple() + + with db.cursor(conn) as cursor: + clause = ", ".join(["?"] * len(privileges_ids)) + cursor.execute( + f"SELECT * FROM privileges WHERE privilege_id IN ({clause})", + privileges_ids) + return tuple( + Privilege(row["privilege_id"], row["privilege_description"]) + for row in cursor.fetchall()) + +def all_privileges(conn: db.DbConnection) -> tuple[Privilege, ...]: + """Retrieve all privileges from the database.""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM privileges") + results = cursor.fetchall() + + return tuple([] if not bool(results) + else (db_row_to_privilege(row) for row in results)) + +def privilege_by_id(conn: db.DbConnection, privilege_id: str) -> Optional[Privilege]: + """Retrieve a privilege by its ID.""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM privileges WHERE privilege_id=?", + (privilege_id,)) + row = cursor.fetchone() + + return db_row_to_privilege(row) if bool(row) else None diff --git a/gn_auth/auth/authorisation/privileges/views.py b/gn_auth/auth/authorisation/privileges/views.py new file mode 100644 index 0000000..d50e5cb --- /dev/null +++ b/gn_auth/auth/authorisation/privileges/views.py @@ -0,0 +1,32 @@ +"""Routes for privileges.""" +from dataclasses import asdict + +from werkzeug.exceptions import NotFound +from flask import jsonify, Blueprint, current_app as app + +from gn_auth.auth.db import sqlite3 as db + +from .models import all_privileges, privilege_by_id + +privileges = Blueprint("privileges", __name__) + +@privileges.route("/", methods=["GET"]) +@privileges.route("/list", methods=["GET"]) +def list_privileges(): + """List all the available privileges.""" + with db.connection(app.config["AUTH_DB"]) as conn: + _privileges = all_privileges(conn) + + return jsonify(_privileges if bool(_privileges) else []), 200 + +@privileges.route("//", methods=["GET"]) +@privileges.route("//view", methods=["GET"]) +def view_privilege(privilege_id: str): + """View details of a single privilege""" + with db.connection(app.config["AUTH_DB"]) as conn: + _privilege = privilege_by_id(conn, privilege_id) + + if bool(_privilege): + return jsonify(_privilege) + + raise NotFound(f"No privilege exists with ID '{privilege_id}'") diff --git a/gn_auth/auth/views.py b/gn_auth/auth/views.py index cf5c45f..17fc94b 100644 --- a/gn_auth/auth/views.py +++ b/gn_auth/auth/views.py @@ -8,6 +8,7 @@ from .authorisation.users.views import users from .authorisation.users.admin import admin from .authorisation.roles.views import roles from .authorisation.resources.views import resources +from .authorisation.privileges.views import privileges from .authorisation.resources.groups.views import groups from .authorisation.resources.system.views import system from .authorisation.resources.inbredset.views import iset @@ -22,4 +23,5 @@ oauth2.register_blueprint(admin, url_prefix="/admin") oauth2.register_blueprint(groups, url_prefix="/group") oauth2.register_blueprint(system, url_prefix="/system") oauth2.register_blueprint(resources, url_prefix="/resource") +oauth2.register_blueprint(privileges, url_prefix="/privileges") oauth2.register_blueprint(iset, url_prefix="/resource/inbredset") -- cgit v1.2.3