From dfe5eb18e3ec8dc570d118bfe95c5d4dcb2c7575 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 2 Feb 2023 11:35:51 +0300 Subject: auth: Reorganise modules/packages for easier dev and maintenance Split the views/routes into separate modules each dealing with a narrower scope of the application to aid in maintenance, and help with making the development easier. --- gn3/auth/authorisation/roles/__init__.py | 3 + gn3/auth/authorisation/roles/models.py | 151 +++++++++++++++++++++++++++++++ gn3/auth/authorisation/roles/views.py | 26 ++++++ 3 files changed, 180 insertions(+) create mode 100644 gn3/auth/authorisation/roles/__init__.py create mode 100644 gn3/auth/authorisation/roles/models.py create mode 100644 gn3/auth/authorisation/roles/views.py (limited to 'gn3/auth/authorisation/roles') diff --git a/gn3/auth/authorisation/roles/__init__.py b/gn3/auth/authorisation/roles/__init__.py new file mode 100644 index 0000000..293a12f --- /dev/null +++ b/gn3/auth/authorisation/roles/__init__.py @@ -0,0 +1,3 @@ +"""Initialise the `gn3.auth.authorisation.roles` package""" + +from .models import Role diff --git a/gn3/auth/authorisation/roles/models.py b/gn3/auth/authorisation/roles/models.py new file mode 100644 index 0000000..b1aac75 --- /dev/null +++ b/gn3/auth/authorisation/roles/models.py @@ -0,0 +1,151 @@ +"""Handle management of roles""" +from uuid import UUID, uuid4 +from functools import reduce +from typing import Any, Sequence, Iterable, NamedTuple + +from pymonad.maybe import Just, Maybe, Nothing +from pymonad.either import Left, Right, Either + +from gn3.auth import db +from gn3.auth.dictify import dictify +from gn3.auth.authentication.users import User + +from ..checks import authorised_p +from ..privileges import Privilege +from ..errors import NotFoundError + +class Role(NamedTuple): + """Class representing a role: creates immutable objects.""" + role_id: UUID + role_name: str + privileges: Iterable[Privilege] + + def dictify(self) -> dict[str, Any]: + """Return a dict representation of `Role` objects.""" + return { + "role_id": self.role_id, "role_name": self.role_name, + "privileges": tuple(dictify(priv) for priv in self.privileges) + } + +@authorised_p(("group:role:create-role",), error_message="Could not create role") +def create_role( + cursor: db.DbCursor, role_name: str, + privileges: Iterable[Privilege]) -> Role: + """ + Create a new generic role. + + PARAMS: + * cursor: A database cursor object - This function could be used as part of + a transaction, hence the use of a cursor rather than a connection + object. + * role_name: The name of the role + * privileges: A 'list' of privileges to assign the new role + + RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object + """ + role = Role(uuid4(), role_name, privileges) + + cursor.execute( + "INSERT INTO roles(role_id, role_name) VALUES (?, ?)", + (str(role.role_id), role.role_name)) + cursor.executemany( + "INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)", + tuple((str(role.role_id), str(priv.privilege_id)) + for priv in privileges)) + + return role + +def __organise_privileges__(roles_dict, privilege_row): + """Organise the privileges into their roles.""" + role_id_str = privilege_row["role_id"] + if role_id_str in roles_dict: + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + roles_dict[role_id_str].privileges + ( + Privilege(privilege_row["privilege_id"], + privilege_row["privilege_description"]),)) + } + + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + (Privilege(privilege_row["privilege_id"], + privilege_row["privilege_description"]),)) + } + +def user_roles(conn: db.DbConnection, user: User) -> Maybe[Sequence[Role]]: + """Retrieve non-resource roles assigned to the user.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " + "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?", + (str(user.user_id),)) + + results = cursor.fetchall() + if results: + return Just(tuple( + reduce(__organise_privileges__, results, {}).values())) + return Nothing + +def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: + """Retrieve a specific non-resource role assigned to the user.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " + "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=? AND ur.role_id=?", + (str(user.user_id), str(role_id))) + + results = cursor.fetchall() + if results: + return Right(tuple( + reduce(__organise_privileges__, results, {}).values())[0]) + return Left(NotFoundError( + f"Could not find role with id '{role_id}'",)) + +def assign_default_roles(cursor: db.DbCursor, user: User): + """Assign `user` some default roles.""" + cursor.execute( + 'SELECT role_id FROM roles WHERE role_name IN ' + '("group-creator")') + role_ids = cursor.fetchall() + str_user_id = str(user.user_id) + params = tuple( + {"user_id": str_user_id, "role_id": row["role_id"]} for row in role_ids) + cursor.executemany( + ("INSERT INTO user_roles VALUES (:user_id, :role_id)"), + params) + +def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): + """Revoke a role from `user` by the role's name""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name=:role_name", + {"role_name": role_name}) + role = cursor.fetchone() + if role: + cursor.execute( + ("DELETE FROM user_roles " + "WHERE user_id=:user_id AND role_id=:role_id"), + {"user_id": str(user.user_id), "role_id": role["role_id"]}) + +def assign_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): + """Revoke a role from `user` by the role's name""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name=:role_name", + {"role_name": role_name}) + role = cursor.fetchone() + + if role: + cursor.execute( + ("INSERT INTO user_roles VALUES(:user_id, :role_id) " + "ON CONFLICT DO NOTHING"), + {"user_id": str(user.user_id), "role_id": role["role_id"]}) diff --git a/gn3/auth/authorisation/roles/views.py b/gn3/auth/authorisation/roles/views.py new file mode 100644 index 0000000..975fb19 --- /dev/null +++ b/gn3/auth/authorisation/roles/views.py @@ -0,0 +1,26 @@ +"""The views/routes for the `gn3.auth.authorisation.roles` package.""" +import uuid + +from flask import jsonify, Response, Blueprint, current_app + +from gn3.auth import db +from gn3.auth.dictify import dictify + +from .models import user_role + +from ...authentication.oauth2.resource_server import require_oauth + +roles = Blueprint("roles", __name__) + +@roles.route("/view/", methods=["GET"]) +@require_oauth("role") +def view_role(role_id: uuid.UUID) -> Response: + """Retrieve a user role with id `role_id`""" + def __error__(exc: Exception): + raise exc + with require_oauth.acquire("profile role") as the_token: + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn: + the_role = user_role(conn, the_token.user, role_id) + return the_role.either( + __error__, lambda a_role: jsonify(dictify(a_role))) -- cgit v1.2.3