From 8b11c16d1a7462089de00dc8759e7bb1d253ed08 Mon Sep 17 00:00:00 2001 From: Munyoki Kilyungi Date: Tue, 30 Apr 2024 13:01:49 +0300 Subject: Create a JWT token when querying a user's role an a resource. * gn_auth/auth/authorisation/resources/views.py: Import time. (get_user_roles_on_resource): Add a JWT bearer token to the responses's header. Signed-off-by: Munyoki Kilyungi --- gn_auth/auth/authorisation/resources/views.py | 46 +++++++++++++++++++++------ 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/gn_auth/auth/authorisation/resources/views.py b/gn_auth/auth/authorisation/resources/views.py index 58adaa2..0200222 100644 --- a/gn_auth/auth/authorisation/resources/views.py +++ b/gn_auth/auth/authorisation/resources/views.py @@ -3,12 +3,15 @@ import uuid import json import operator import sqlite3 +import time from dataclasses import asdict from functools import reduce from authlib.integrations.flask_oauth2.errors import _HTTPException -from flask import request, jsonify, Response, Blueprint, current_app as app +from authlib.jose import jwt +from flask import (make_response, request, jsonify, Response, + Blueprint, current_app as app) from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.db.sqlite3 import with_db_connection @@ -382,25 +385,48 @@ def get_user_roles_on_resource(name) -> Response: resid = with_db_connection( lambda conn: get_resource_id(conn, name) ) - def _extract_privilege_id(privileges): return tuple( p_.privilege_id for p_ in privileges ) with require_oauth.acquire("profile resource") as _token: - _resources = with_db_connection( + resources_ = with_db_connection( lambda conn: user_roles_on_resources( conn, _token.user, (resid,) ) ) - _roles = tuple( - _extract_privilege_id(role.privileges) + roles: list = reduce (operator.iconcat, + tuple( + _extract_privilege_id(role.privileges) for role in - _resources.get( - uuid.UUID(resid), {} - ).get("roles", tuple())) - return jsonify({ + resources_.get( + uuid.UUID(resid), {} + ).get("roles", tuple())), []) + response = make_response({ # Flatten this list - "roles": reduce(operator.iconcat, _roles, []) + "roles": roles, + "silly": "ausah", }) + iat = int(time.time()) + jose_header = { + "alg": "RS256", + "typ": "jwt", + "cty": "json", + } + payload = { + # Registered Claims + "iss": request.url, # Issuer Claim + "iat": iat, # Issued At + "sub": name, # Subject Claim + "aud": f"Edit {name}", # Audience Claim + "exp": iat + 300, # Expiration Time Claim + "jti": str(uuid.uuid4()), # Unique Identifier for this token + # Private Claims + "account-name": _token.user.name, + "email": _token.user.email, + "roles": roles, + } + token = jwt.encode(jose_header, payload, app.config["SSL_PRIVATE_KEY"]) + response.headers["Authorization"] = f"Bearer {token.decode('utf-8')}" + return response -- cgit v1.2.3