From 0a31f61ee9db84eb35087073ef6b58f352252aae Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 3 Jan 2023 07:22:02 +0300 Subject: auth: Fetch all of a user's roles. * gn3/auth/authorisation/roles.py: Fetch roles from DB * gn3/auth/authorisation/views.py: Provide API endpoint for user roles * tests/unit/auth/test_roles.py: Tests to check fetching roles works correctly Fix linting and typing issues in the following files: * gn3/auth/authentication/oauth2/resource_server.py * gn3/auth/authentication/oauth2/views.py * tests/unit/auth/fixtures/oauth2_client_fixtures.py --- gn3/auth/authentication/oauth2/resource_server.py | 4 +- gn3/auth/authentication/oauth2/views.py | 5 ++- gn3/auth/authorisation/roles.py | 47 +++++++++++++++++++++++ gn3/auth/authorisation/views.py | 15 ++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 gn3/auth/authorisation/views.py (limited to 'gn3/auth') diff --git a/gn3/auth/authentication/oauth2/resource_server.py b/gn3/auth/authentication/oauth2/resource_server.py index 885cbd8..223e811 100644 --- a/gn3/auth/authentication/oauth2/resource_server.py +++ b/gn3/auth/authentication/oauth2/resource_server.py @@ -2,7 +2,7 @@ from flask import current_app as app from authlib.oauth2.rfc6750 import BearerTokenValidator as _BearerTokenValidator -from authlib.integrations.flask_oauth2 import ResourceProtector, current_token +from authlib.integrations.flask_oauth2 import ResourceProtector from gn3.auth import db from gn3.auth.authentication.oauth2.models.oauth2token import token_by_access_token @@ -11,7 +11,7 @@ class BearerTokenValidator(_BearerTokenValidator): """Extends `authlib.oauth2.rfc6750.BearerTokenValidator`""" def authenticate_token(self, token_string: str): with db.connection(app.config["AUTH_DB"]) as conn: - return token_by_access_token(conn, token_string).maybe( + return token_by_access_token(conn, token_string).maybe(# type: ignore[misc] None, lambda tok: tok) require_oauth = ResourceProtector() diff --git a/gn3/auth/authentication/oauth2/views.py b/gn3/auth/authentication/oauth2/views.py index 0947aa2..7d0d7dd 100644 --- a/gn3/auth/authentication/oauth2/views.py +++ b/gn3/auth/authentication/oauth2/views.py @@ -45,8 +45,9 @@ def introspect_token(): @oauth2.route("/user") @require_oauth("profile") def user_details(): - with require_oauth.acquire("profile") as token: - user = token.user + """Return user's details.""" + with require_oauth.acquire("profile") as the_token: + user = the_token.user return jsonify({ "user_id": user.user_id, "email": user.email, diff --git a/gn3/auth/authorisation/roles.py b/gn3/auth/authorisation/roles.py index 397ad80..e71d427 100644 --- a/gn3/auth/authorisation/roles.py +++ b/gn3/auth/authorisation/roles.py @@ -1,8 +1,10 @@ """Handle management of roles""" from uuid import UUID, uuid4 +from functools import reduce from typing import Iterable, NamedTuple from gn3.auth import db +from gn3.auth.authentication.users import User from gn3.auth.authentication.checks import authenticated_p from .checks import authorised_p @@ -42,3 +44,48 @@ def create_role( for priv in privileges)) return role + +def __organise_privileges__(roles_dict, privilege_row): + """Organise the privileges into their roles.""" + role_id_str = privilege_row["role_id"] + if role_id_str in roles_dict: + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + roles_dict[role_id_str].privileges + ( + Privilege(UUID(privilege_row["privilege_id"]), + privilege_row["privilege_name"]),)) + } + + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + (Privilege(UUID(privilege_row["privilege_id"]), + privilege_row["privilege_name"]),)) + } + +def user_roles(conn: db.DbConnection, user: User): + """Retrieve ALL roles assigned to the user.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " + "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=? " + "UNION " + "SELECT r.*, p.* FROM group_user_roles_on_resources AS guror " + "INNER JOIN roles AS r ON guror.role_id=r.role_id " + "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE guror.user_id=?", + ((str(user.user_id),)*2)) + + results = cursor.fetchall() + if results: + return tuple( + reduce(__organise_privileges__, results, {}).values()) + return tuple() diff --git a/gn3/auth/authorisation/views.py b/gn3/auth/authorisation/views.py new file mode 100644 index 0000000..2481633 --- /dev/null +++ b/gn3/auth/authorisation/views.py @@ -0,0 +1,15 @@ +"""Endpoints for the authorisation stuff.""" +from flask import jsonify, current_app + +from gn3.auth import db +from .roles import user_roles as _user_roles +from ..authentication.oauth2.views import oauth2 +from ..authentication.oauth2.resource_server import require_oauth + +@oauth2.route("/user-roles") +@require_oauth +def user_roles(): + """Return the roles assigned to the user.""" + with require_oauth.acquire("role") as token: + with db.connection(current_app.config["AUTH_DB"]) as conn: + return jsonify(_user_roles(conn, token.user)) -- cgit v1.2.3