diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users')
| -rw-r--r-- | gn_auth/auth/authorisation/users/admin/models.py | 56 | ||||
| -rw-r--r-- | gn_auth/auth/authorisation/users/collections/models.py | 4 | ||||
| -rw-r--r-- | gn_auth/auth/authorisation/users/masquerade/views.py | 16 | ||||
| -rw-r--r-- | gn_auth/auth/authorisation/users/views.py | 23 |
4 files changed, 75 insertions, 24 deletions
diff --git a/gn_auth/auth/authorisation/users/admin/models.py b/gn_auth/auth/authorisation/users/admin/models.py index 36f3c09..3d68932 100644 --- a/gn_auth/auth/authorisation/users/admin/models.py +++ b/gn_auth/auth/authorisation/users/admin/models.py @@ -1,23 +1,55 @@ """Major function for handling admin users.""" +import warnings + from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.authentication.users import User +from gn_auth.auth.authorisation.roles.models import Role, db_rows_to_roles -def make_sys_admin(cursor: db.DbCursor, user: User) -> User: - """Make a given user into an system admin.""" + +def sysadmin_role(conn: db.DbConnection) -> Role: + """Fetch the `system-administrator` role details.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT roles.*, privileges.* " + "FROM roles INNER JOIN role_privileges " + "ON roles.role_id=role_privileges.role_id " + "INNER JOIN privileges " + "ON role_privileges.privilege_id=privileges.privilege_id " + "WHERE role_name='system-administrator'") + results = db_rows_to_roles(cursor.fetchall()) + + assert len(results) == 1, ( + "There should only ever be one 'system-administrator' role.") + return results[0] + + +def grant_sysadmin_role(cursor: db.DbCursor, user: User) -> User: + """Grant `system-administrator` role to `user`.""" cursor.execute( "SELECT * FROM roles WHERE role_name='system-administrator'") admin_role = cursor.fetchone() - cursor.execute( - "SELECT * FROM resources AS r " - "INNER JOIN resource_categories AS rc " - "ON r.resource_category_id=rc.resource_category_id " - "WHERE resource_category_key='system'") - the_system = cursor.fetchone() - cursor.execute( + cursor.execute("SELECT resources.resource_id FROM resources") + cursor.executemany( "INSERT INTO user_roles VALUES (:user_id, :role_id, :resource_id)", - { + tuple({ "user_id": str(user.user_id), "role_id": admin_role["role_id"], - "resource_id": the_system["resource_id"] - }) + "resource_id": resource_id + } for resource_id in cursor.fetchall())) return user + + +def make_sys_admin(cursor: db.DbCursor, user: User) -> User: + """Make a given user into an system admin.""" + warnings.warn( + DeprecationWarning( + f"The function `{__name__}.make_sys_admin` will be removed soon"), + stacklevel=1) + return grant_sysadmin_role(cursor, user) + + +def revoke_sysadmin_role(conn: db.DbConnection, user: User): + """Revoke `system-administrator` role from `user`.""" + with db.cursor(conn) as cursor: + cursor.execute("DELETE FROM user_roles WHERE user_id=? AND role_id=?", + (str(user.user_id), str(sysadmin_role(conn).role_id))) diff --git a/gn_auth/auth/authorisation/users/collections/models.py b/gn_auth/auth/authorisation/users/collections/models.py index 63443ef..30242c2 100644 --- a/gn_auth/auth/authorisation/users/collections/models.py +++ b/gn_auth/auth/authorisation/users/collections/models.py @@ -72,8 +72,8 @@ def __retrieve_old_accounts__(rconn: Redis) -> dict: def parse_collection(coll: dict) -> dict: """Parse the collection as persisted in redis to a usable python object.""" - created = coll.get("created", coll.get("created_timestamp")) - changed = coll.get("changed", coll.get("changed_timestamp")) + created = coll.get("created", coll.get("created_timestamp", "")) + changed = coll.get("changed", coll.get("changed_timestamp", "")) return { "id": UUID(coll["id"]), "name": coll["name"], diff --git a/gn_auth/auth/authorisation/users/masquerade/views.py b/gn_auth/auth/authorisation/users/masquerade/views.py index 8b897f2..12a8c97 100644 --- a/gn_auth/auth/authorisation/users/masquerade/views.py +++ b/gn_auth/auth/authorisation/users/masquerade/views.py @@ -1,14 +1,14 @@ """Endpoints for user masquerade""" from dataclasses import asdict from uuid import UUID -from functools import partial -from flask import request, jsonify, Response, Blueprint +from flask import request, jsonify, Response, Blueprint, current_app from gn_auth.auth.errors import InvalidData +from gn_auth.auth.authorisation.resources.groups.models import user_group +from ....db import sqlite3 as db from ...checks import require_json -from ....db.sqlite3 import with_db_connection from ....authentication.users import user_by_id from ....authentication.oauth2.resource_server import require_oauth @@ -21,13 +21,13 @@ masq = Blueprint("masquerade", __name__) @require_json def masquerade() -> Response: """Masquerade as a particular user.""" - with require_oauth.acquire("profile user masquerade") as token: + with (require_oauth.acquire("profile user masquerade") as token, + db.connection(current_app.config["AUTH_DB"]) as conn): masqueradee_id = UUID(request.json["masquerade_as"])#type: ignore[index] if masqueradee_id == token.user.user_id: raise InvalidData("You are not allowed to masquerade as yourself.") - masq_user = with_db_connection(partial( - user_by_id, user_id=masqueradee_id)) + masq_user = user_by_id(conn, user_id=masqueradee_id) def __masq__(conn): new_token = masquerade_as(conn, original_token=token, masqueradee=masq_user) @@ -39,6 +39,8 @@ def masquerade() -> Response: }, "masquerade_as": { "user": asdict(masq_user), - "token": with_db_connection(__masq__) + "token": __masq__(conn), + **(user_group(conn, masq_user).maybe(# type: ignore[misc] + {}, lambda grp: {"group": grp})) } }) diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py index 91e459d..c248ac3 100644 --- a/gn_auth/auth/authorisation/users/views.py +++ b/gn_auth/auth/authorisation/users/views.py @@ -4,9 +4,9 @@ import sqlite3 import secrets import traceback from dataclasses import asdict -from typing import Any, Sequence from urllib.parse import urljoin from functools import reduce, partial +from typing import Any, Union, Sequence from datetime import datetime, timedelta from email.headerregistry import Address from email_validator import validate_email, EmailNotValidError @@ -46,7 +46,8 @@ from gn_auth.auth.errors import ( UserRegistrationError) -from gn_auth.auth.authentication.users import valid_login, user_by_email +from gn_auth.auth.authentication.users import ( + valid_login, user_by_email, user_by_id) from gn_auth.auth.authentication.oauth2.resource_server import require_oauth from gn_auth.auth.authentication.users import User, save_user, set_user_password from gn_auth.auth.authentication.oauth2.models.oauth2token import ( @@ -75,9 +76,25 @@ def user_details() -> Response: False, lambda grp: grp)# type: ignore[arg-type] return jsonify({ **user_dets, - "group": asdict(the_group) if the_group else False + **({"group": asdict(the_group)} if the_group else {}) }) +@users.route("/<user_id>", methods=["GET"]) +def get_user(user_id: str) -> Union[Response, tuple[Response, int]]: + """Fetch user details by user_id.""" + try: + with db.connection(current_app.config["AUTH_DB"]) as conn: + user = user_by_id(conn, uuid.UUID(user_id)) + return jsonify({ + "user_id": str(user.user_id), + "email": user.email, + "name": user.name + }) + except ValueError: + return jsonify({"error": "Invalid user ID format"}), 400 + except NotFoundError: + return jsonify({"error": "User not found"}), 404 + @users.route("/roles", methods=["GET"]) @require_oauth("role") def user_roles() -> Response: |
