diff options
Diffstat (limited to 'gn3/auth/authorisation/users')
| -rw-r--r-- | gn3/auth/authorisation/users/__init__.py | 12 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/base.py | 128 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/collections/__init__.py | 1 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/collections/models.py | 269 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/collections/views.py | 239 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/models.py | 66 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/views.py | 173 |
7 files changed, 0 insertions, 888 deletions
diff --git a/gn3/auth/authorisation/users/__init__.py b/gn3/auth/authorisation/users/__init__.py deleted file mode 100644 index 5f0c89c..0000000 --- a/gn3/auth/authorisation/users/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Initialise the users' package.""" -from .base import ( - User, - users, - save_user, - user_by_id, - # valid_login, - user_by_email, - hash_password, # only used in tests... maybe make gn-auth a GN3 dependency - same_password, - set_user_password -) diff --git a/gn3/auth/authorisation/users/base.py b/gn3/auth/authorisation/users/base.py deleted file mode 100644 index 0e72ed2..0000000 --- a/gn3/auth/authorisation/users/base.py +++ /dev/null @@ -1,128 +0,0 @@ -"""User-specific code and data structures.""" -from uuid import UUID, uuid4 -from typing import Any, Tuple, NamedTuple - -from argon2 import PasswordHasher -from argon2.exceptions import VerifyMismatchError - -from gn3.auth import db -from gn3.auth.authorisation.errors import NotFoundError - -class User(NamedTuple): - """Class representing a user.""" - user_id: UUID - email: str - name: str - - def get_user_id(self): - """Return the user's UUID. Mostly for use with Authlib.""" - return self.user_id - - def dictify(self) -> dict[str, Any]: - """Return a dict representation of `User` objects.""" - return {"user_id": self.user_id, "email": self.email, "name": self.name} - -DUMMY_USER = User(user_id=UUID("a391cf60-e8b7-4294-bd22-ddbbda4b3530"), - email="gn3@dummy.user", - name="Dummy user to use as placeholder") - -def user_by_email(conn: db.DbConnection, email: str) -> User: - """Retrieve user from database by their email address""" - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM users WHERE email=?", (email,)) - row = cursor.fetchone() - - if row: - return User(UUID(row["user_id"]), row["email"], row["name"]) - - raise NotFoundError(f"Could not find user with email {email}") - -def user_by_id(conn: db.DbConnection, user_id: UUID) -> User: - """Retrieve user from database by their user id""" - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM users WHERE user_id=?", (str(user_id),)) - row = cursor.fetchone() - - if row: - return User(UUID(row["user_id"]), row["email"], row["name"]) - - raise NotFoundError(f"Could not find user with ID {user_id}") - -def same_password(password: str, hashed: str) -> bool: - """Check that `raw_password` is hashed to `hash`""" - try: - return hasher().verify(hashed, password) - except VerifyMismatchError as _vme: - return False - -def valid_login(conn: db.DbConnection, user: User, password: str) -> bool: - """Check the validity of the provided credentials for login.""" - with db.cursor(conn) as cursor: - cursor.execute( - ("SELECT * FROM users LEFT JOIN user_credentials " - "ON users.user_id=user_credentials.user_id " - "WHERE users.user_id=?"), - (str(user.user_id),)) - row = cursor.fetchone() - - if row is None: - return False - - return same_password(password, row["password"]) - -def save_user(cursor: db.DbCursor, email: str, name: str) -> User: - """ - Create and persist a user. - - The user creation could be done during a transaction, therefore the function - takes a cursor object rather than a connection. - - The newly created and persisted user is then returned. - """ - user_id = uuid4() - cursor.execute("INSERT INTO users VALUES (?, ?, ?)", - (str(user_id), email, name)) - return User(user_id, email, name) - -def hasher(): - """Retrieve PasswordHasher object""" - # TODO: Maybe tune the parameters here... - # Tuneable Parameters: - # - time_cost (default: 2) - # - memory_cost (default: 102400) - # - parallelism (default: 8) - # - hash_len (default: 16) - # - salt_len (default: 16) - # - encoding (default: 'utf-8') - # - type (default: <Type.ID: 2>) - return PasswordHasher() - -def hash_password(password): - """Hash the password.""" - return hasher().hash(password) - -def set_user_password( - cursor: db.DbCursor, user: User, password: str) -> Tuple[User, bytes]: - """Set the given user's password in the database.""" - hashed_password = hash_password(password) - cursor.execute( - ("INSERT INTO user_credentials VALUES (:user_id, :hash) " - "ON CONFLICT (user_id) DO UPDATE SET password=:hash"), - {"user_id": str(user.user_id), "hash": hashed_password}) - return user, hashed_password - -def users(conn: db.DbConnection, - ids: tuple[UUID, ...] = tuple()) -> tuple[User, ...]: - """ - Fetch all users with the given `ids`. If `ids` is empty, return ALL users. - """ - params = ", ".join(["?"] * len(ids)) - with db.cursor(conn) as cursor: - query = "SELECT * FROM users" + ( - f" WHERE user_id IN ({params})" - if len(ids) > 0 else "") - print(query) - cursor.execute(query, tuple(str(the_id) for the_id in ids)) - return tuple(User(UUID(row["user_id"]), row["email"], row["name"]) - for row in cursor.fetchall()) - return tuple() diff --git a/gn3/auth/authorisation/users/collections/__init__.py b/gn3/auth/authorisation/users/collections/__init__.py deleted file mode 100644 index 88ab040..0000000 --- a/gn3/auth/authorisation/users/collections/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package dealing with user collections.""" diff --git a/gn3/auth/authorisation/users/collections/models.py b/gn3/auth/authorisation/users/collections/models.py deleted file mode 100644 index 7577fa8..0000000 --- a/gn3/auth/authorisation/users/collections/models.py +++ /dev/null @@ -1,269 +0,0 @@ -"""Handle user collections.""" -import json -from uuid import UUID, uuid4 -from datetime import datetime - -from redis import Redis -from email_validator import validate_email, EmailNotValidError - -from gn3.auth.authorisation.errors import InvalidData, NotFoundError - -from ..models import User - -__OLD_REDIS_COLLECTIONS_KEY__ = "collections" -__REDIS_COLLECTIONS_KEY__ = "collections2" - -class CollectionJSONEncoder(json.JSONEncoder): - """Serialise collection objects into JSON.""" - def default(self, obj):# pylint: disable=[arguments-renamed] - if isinstance(obj, UUID): - return str(obj) - if isinstance(obj, datetime): - return obj.strftime("%b %d %Y %I:%M%p") - return json.JSONEncoder.default(self, obj) - -def __valid_email__(email:str) -> bool: - """Check for email validity.""" - try: - validate_email(email, check_deliverability=True) - except EmailNotValidError as _enve: - return False - return True - -def __toggle_boolean_field__( - rconn: Redis, email: str, field: str): - """Toggle the valuen of a boolean field""" - mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}") - if bool(mig_dict): - rconn.hset("migratable-accounts", email, - json.dumps({**mig_dict, field: not mig_dict.get(field, True)})) - -def __build_email_uuid_bridge__(rconn: Redis): - """ - Build a connection between new accounts and old user accounts. - - The only thing that is common between the two is the email address, - therefore, we use that to link the two items. - """ - old_accounts = { - account["email_address"]: { - "user_id": account["user_id"], - "collections-migrated": False, - "resources_migrated": False - } for account in ( - acct for acct in - (json.loads(usr) for usr in rconn.hgetall("users").values()) - if (bool(acct.get("email_address", False)) and - __valid_email__(acct["email_address"]))) - } - if bool(old_accounts): - rconn.hset("migratable-accounts", mapping={ - key: json.dumps(value) for key,value in old_accounts.items() - }) - return old_accounts - -def __retrieve_old_accounts__(rconn: Redis) -> dict: - accounts = rconn.hgetall("migratable-accounts") - if accounts: - return { - key: json.loads(value) for key, value in accounts.items() - } - return __build_email_uuid_bridge__(rconn) - -def parse_collection(coll: dict) -> dict: - """Parse the collection as persisted in redis to a usable python object.""" - created = coll.get("created", coll.get("created_timestamp")) - changed = coll.get("changed", coll.get("changed_timestamp")) - return { - "id": UUID(coll["id"]), - "name": coll["name"], - "created": datetime.strptime(created, "%b %d %Y %I:%M%p"), - "changed": datetime.strptime(changed, "%b %d %Y %I:%M%p"), - "num_members": int(coll["num_members"]), - "members": coll["members"] - } - -def dump_collection(pythoncollection: dict) -> str: - """Convert the collection from a python object to a json string.""" - return json.dumps(pythoncollection, cls=CollectionJSONEncoder) - -def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple: - """Retrieve any old collections relating to the user.""" - return tuple(parse_collection(coll) for coll in - json.loads(rconn.hget( - __OLD_REDIS_COLLECTIONS_KEY__, str(old_user_id)) or "[]")) - -def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]: - """Retrieve current user collections.""" - collections = tuple(parse_collection(coll) for coll in json.loads( - rconn.hget(__REDIS_COLLECTIONS_KEY__, str(user.user_id)) or - "[]")) - old_accounts = __retrieve_old_accounts__(rconn) - if (user.email in old_accounts and - not old_accounts[user.email]["collections-migrated"]): - old_user_id = old_accounts[user.email]["user_id"] - collections = tuple({ - coll["id"]: coll for coll in ( - collections + __retrieve_old_user_collections__( - rconn, UUID(old_user_id))) - }.values()) - __toggle_boolean_field__(rconn, user.email, "collections-migrated") - rconn.hset( - __REDIS_COLLECTIONS_KEY__, - key=str(user.user_id), - value=json.dumps(collections, cls=CollectionJSONEncoder)) - return collections - -def save_collections(rconn: Redis, user: User, collections: tuple[dict, ...]) -> tuple[dict, ...]: - """Save the `collections` to redis.""" - rconn.hset( - __REDIS_COLLECTIONS_KEY__, - str(user.user_id), - json.dumps(collections, cls=CollectionJSONEncoder)) - return collections - -def add_to_user_collections(rconn: Redis, user: User, collection: dict) -> dict: - """Add `collection` to list of user collections.""" - ucolls = user_collections(rconn, user) - save_collections(rconn, user, ucolls + (collection,)) - return collection - -def create_collection(rconn: Redis, user: User, name: str, traits: tuple) -> dict: - """Create a new collection.""" - now = datetime.utcnow() - return add_to_user_collections(rconn, user, { - "id": uuid4(), - "name": name, - "created": now, - "changed": now, - "num_members": len(traits), - "members": traits - }) - -def get_collection(rconn: Redis, user: User, collection_id: UUID) -> dict: - """Retrieve the collection with ID `collection_id`.""" - colls = tuple(coll for coll in user_collections(rconn, user) - if coll["id"] == collection_id) - if len(colls) == 0: - raise NotFoundError( - f"Could not find a collection with ID `{collection_id}` for user " - f"with ID `{user.user_id}`") - if len(colls) > 1: - err = InvalidData( - "More than one collection was found having the ID " - f"`{collection_id}` for user with ID `{user.user_id}`.") - err.error_code = 513 - raise err - return colls[0] - -def __raise_if_collections_empty__(user: User, collections: tuple[dict, ...]): - """Raise an exception if no collections are found for `user`.""" - if len(collections) < 1: - raise NotFoundError(f"No collections found for user `{user.user_id}`") - -def __raise_if_not_single_collection__( - user: User, collection_id: UUID, collections: tuple[dict, ...]): - """ - Raise an exception there is zero, or more than one collection for `user`. - """ - if len(collections) == 0: - raise NotFoundError(f"No collections found for user `{user.user_id}` " - f"with ID `{collection_id}`.") - if len(collections) > 1: - err = InvalidData( - "More than one collection was found having the ID " - f"`{collection_id}` for user with ID `{user.user_id}`.") - err.error_code = 513 - raise err - -def delete_collections(rconn: Redis, - user: User, - collection_ids: tuple[UUID, ...]) -> tuple[dict, ...]: - """ - Delete collections with the given `collection_ids` returning the deleted - collections. - """ - ucolls = user_collections(rconn, user) - save_collections( - rconn, - user, - tuple(coll for coll in ucolls if coll["id"] not in collection_ids)) - return tuple(coll for coll in ucolls if coll["id"] in collection_ids) - -def add_traits(rconn: Redis, - user: User, - collection_id: UUID, - traits: tuple[str, ...]) -> dict: - """ - Add `traits` to the `user` collection identified by `collection_id`. - - Returns: The collection with the new traits added. - """ - ucolls = user_collections(rconn, user) - __raise_if_collections_empty__(user, ucolls) - - mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) - __raise_if_not_single_collection__(user, collection_id, mod_col) - new_members = tuple(set(tuple(mod_col[0]["members"]) + traits)) - new_coll = { - **mod_col[0], - "members": new_members, - "num_members": len(new_members) - } - save_collections( - rconn, - user, - (tuple(coll for coll in ucolls if coll["id"] != collection_id) + - (new_coll,))) - return new_coll - -def remove_traits(rconn: Redis, - user: User, - collection_id: UUID, - traits: tuple[str, ...]) -> dict: - """ - Remove `traits` from the `user` collection identified by `collection_id`. - - Returns: The collection with the specified `traits` removed. - """ - ucolls = user_collections(rconn, user) - __raise_if_collections_empty__(user, ucolls) - - mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) - __raise_if_not_single_collection__(user, collection_id, mod_col) - new_members = tuple( - trait for trait in mod_col[0]["members"] if trait not in traits) - new_coll = { - **mod_col[0], - "members": new_members, - "num_members": len(new_members) - } - save_collections( - rconn, - user, - (tuple(coll for coll in ucolls if coll["id"] != collection_id) + - (new_coll,))) - return new_coll - -def change_name(rconn: Redis, - user: User, - collection_id: UUID, - new_name: str) -> dict: - """ - Change the collection's name. - - Returns: The collection with the new name. - """ - ucolls = user_collections(rconn, user) - __raise_if_collections_empty__(user, ucolls) - - mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) - __raise_if_not_single_collection__(user, collection_id, mod_col) - - new_coll = {**mod_col[0], "name": new_name} - save_collections( - rconn, - user, - (tuple(coll for coll in ucolls if coll["id"] != collection_id) + - (new_coll,))) - return new_coll diff --git a/gn3/auth/authorisation/users/collections/views.py b/gn3/auth/authorisation/users/collections/views.py deleted file mode 100644 index 775e8bc..0000000 --- a/gn3/auth/authorisation/users/collections/views.py +++ /dev/null @@ -1,239 +0,0 @@ -"""Views regarding user collections.""" -from uuid import UUID - -from redis import Redis -from flask import jsonify, request, Response, Blueprint, current_app - -from gn3.auth import db -from gn3.auth.db_utils import with_db_connection -from gn3.auth.authorisation.checks import require_json -from gn3.auth.authorisation.errors import NotFoundError - -from gn3.auth.authorisation.users import User, user_by_id -from gn3.auth.authorisation.oauth2.resource_server import require_oauth - -from .models import ( - add_traits, - change_name, - remove_traits, - get_collection, - user_collections, - save_collections, - create_collection, - delete_collections as _delete_collections) - -collections = Blueprint("collections", __name__) - -@collections.route("/list") -@require_oauth("profile user") -def list_user_collections() -> Response: - """Retrieve the user ids""" - with (require_oauth.acquire("profile user") as the_token, - Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - return jsonify(user_collections(redisconn, the_token.user)) - -@collections.route("/<uuid:anon_id>/list") -def list_anonymous_collections(anon_id: UUID) -> Response: - """Fetch anonymous collections""" - with Redis.from_url( - current_app.config["REDIS_URI"], decode_responses=True) as redisconn: - def __list__(conn: db.DbConnection) -> tuple: - try: - _user = user_by_id(conn, anon_id) - current_app.logger.warning( - "Fetch collections for authenticated user using the " - "`list_user_collections()` endpoint.") - return tuple() - except NotFoundError as _nfe: - return user_collections( - redisconn, User(anon_id, "anon@ymous.user", "Anonymous User")) - - return jsonify(with_db_connection(__list__)) - -@require_oauth("profile user") -def __new_collection_as_authenticated_user__(redisconn, name, traits): - """Create a new collection as an authenticated user.""" - with require_oauth.acquire("profile user") as token: - return create_collection(redisconn, token.user, name, traits) - -def __new_collection_as_anonymous_user__(redisconn, name, traits): - """Create a new collection as an anonymous user.""" - return create_collection(redisconn, - User(UUID(request.json.get("anon_id")), - "anon@ymous.user", - "Anonymous User"), - name, - traits) - -@collections.route("/new", methods=["POST"]) -@require_json -def new_user_collection() -> Response: - """Create a new collection.""" - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - traits = tuple(request.json.get("traits", tuple()))# type: ignore[union-attr] - name = request.json.get("name")# type: ignore[union-attr] - if bool(request.headers.get("Authorization")): - return jsonify(__new_collection_as_authenticated_user__( - redisconn, name, traits)) - return jsonify(__new_collection_as_anonymous_user__( - redisconn, name, traits)) - -@collections.route("/<uuid:collection_id>/view", methods=["POST"]) -@require_json -def view_collection(collection_id: UUID) -> Response: - """View a particular collection""" - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if bool(request.headers.get("Authorization")): - with require_oauth.acquire("profile user") as token: - return jsonify(get_collection(redisconn, - token.user, - collection_id)) - return jsonify(get_collection( - redisconn, - User( - UUID(request.json.get("anon_id")),#type: ignore[union-attr] - "anon@ymous.user", - "Anonymous User"), - collection_id)) - -@collections.route("/anonymous/import", methods=["POST"]) -@require_json -@require_oauth("profile user") -def import_anonymous() -> Response: - """Import anonymous collections.""" - with (require_oauth.acquire("profile user") as token, - Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - anon_id = UUID(request.json.get("anon_id"))#type: ignore[union-attr] - anon_colls = user_collections(redisconn, User( - anon_id, "anon@ymous.user", "Anonymous User")) - save_collections( - redisconn, - token.user, - (user_collections(redisconn, token.user) + - anon_colls)) - redisconn.hdel("collections", str(anon_id)) - return jsonify({ - "message": f"Import of {len(anon_colls)} was successful." - }) - -@collections.route("/anonymous/delete", methods=["POST"]) -@require_json -@require_oauth("profile user") -def delete_anonymous() -> Response: - """Delete anonymous collections.""" - with (require_oauth.acquire("profile user") as _token, - Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - anon_id = UUID(request.json.get("anon_id"))#type: ignore[union-attr] - anon_colls = user_collections(redisconn, User( - anon_id, "anon@ymous.user", "Anonymous User")) - redisconn.hdel("collections", str(anon_id)) - return jsonify({ - "message": f"Deletion of {len(anon_colls)} was successful." - }) - -@collections.route("/delete", methods=["POST"]) -@require_json -def delete_collections(): - """Delete specified collections.""" - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - coll_ids = tuple(UUID(cid) for cid in request.json["collection_ids"]) - deleted = _delete_collections( - redisconn, - User(request.json["anon_id"], "anon@ymous.user", "Anonymous User"), - coll_ids) - if bool(request.headers.get("Authorization")): - with require_oauth.acquire("profile user") as token: - deleted = deleted + _delete_collections( - redisconn, token.user, coll_ids) - - return jsonify({ - "message": f"Deleted {len(deleted)} collections."}) - -@collections.route("/<uuid:collection_id>/traits/remove", methods=["POST"]) -@require_json -def remove_traits_from_collection(collection_id: UUID) -> Response: - """Remove specified traits from collection with ID `collection_id`.""" - if len(request.json["traits"]) < 1:#type: ignore[index] - return jsonify({"message": "No trait to remove from collection."}) - - the_traits = tuple(request.json["traits"])#type: ignore[index] - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if not bool(request.headers.get("Authorization")): - coll = remove_traits( - redisconn, - User(request.json["anon_id"],#type: ignore[index] - "anon@ymous.user", - "Anonymous User"), - collection_id, - the_traits) - else: - with require_oauth.acquire("profile user") as token: - coll = remove_traits( - redisconn, token.user, collection_id, the_traits) - - return jsonify({ - "message": f"Deleted {len(the_traits)} traits from collection.", - "collection": coll - }) - -@collections.route("/<uuid:collection_id>/traits/add", methods=["POST"]) -@require_json -def add_traits_to_collection(collection_id: UUID) -> Response: - """Add specified traits to collection with ID `collection_id`.""" - if len(request.json["traits"]) < 1:#type: ignore[index] - return jsonify({"message": "No trait to add to collection."}) - - the_traits = tuple(request.json["traits"])#type: ignore[index] - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if not bool(request.headers.get("Authorization")): - coll = add_traits( - redisconn, - User(request.json["anon_id"],#type: ignore[index] - "anon@ymous.user", - "Anonymous User"), - collection_id, - the_traits) - else: - with require_oauth.acquire("profile user") as token: - coll = add_traits( - redisconn, token.user, collection_id, the_traits) - - return jsonify({ - "message": f"Added {len(the_traits)} traits to collection.", - "collection": coll - }) - -@collections.route("/<uuid:collection_id>/rename", methods=["POST"]) -@require_json -def rename_collection(collection_id: UUID) -> Response: - """Rename the given collection""" - if not bool(request.json["new_name"]):#type: ignore[index] - return jsonify({"message": "No new name to change to."}) - - new_name = request.json["new_name"]#type: ignore[index] - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if not bool(request.headers.get("Authorization")): - coll = change_name(redisconn, - User(UUID(request.json["anon_id"]),#type: ignore[index] - "anon@ymous.user", - "Anonymous User"), - collection_id, - new_name) - else: - with require_oauth.acquire("profile user") as token: - coll = change_name( - redisconn, token.user, collection_id, new_name) - - return jsonify({ - "message": "Collection rename successful.", - "collection": coll - }) diff --git a/gn3/auth/authorisation/users/models.py b/gn3/auth/authorisation/users/models.py deleted file mode 100644 index 0157154..0000000 --- a/gn3/auth/authorisation/users/models.py +++ /dev/null @@ -1,66 +0,0 @@ -"""Functions for acting on users.""" -import uuid -from functools import reduce - -from gn3.auth import db -from gn3.auth.authorisation.roles.models import Role -from gn3.auth.authorisation.checks import authorised_p -from gn3.auth.authorisation.privileges import Privilege - -from .base import User - -@authorised_p( - ("system:user:list",), - "You do not have the appropriate privileges to list users.", - oauth2_scope="profile user") -def list_users(conn: db.DbConnection) -> tuple[User, ...]: - """List out all users.""" - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM users") - return tuple( - User(uuid.UUID(row["user_id"]), row["email"], row["name"]) - for row in cursor.fetchall()) - -def __build_resource_roles__(rows): - def __build_roles__(roles, row): - role_id = uuid.UUID(row["role_id"]) - priv = Privilege(row["privilege_id"], row["privilege_description"]) - role = roles.get(role_id, Role( - role_id, row["role_name"], bool(row["user_editable"]), tuple())) - return { - **roles, - role_id: Role(role_id, role.role_name, role.user_editable, role.privileges + (priv,)) - } - def __build__(acc, row): - resource_id = uuid.UUID(row["resource_id"]) - return { - **acc, - resource_id: __build_roles__(acc.get(resource_id, {}), row) - } - return { - resource_id: tuple(roles.values()) - for resource_id, roles in reduce(__build__, rows, {}).items() - } - -# @authorised_p( -# ("",), -# ("You do not have the appropriate privileges to view a user's roles on " -# "resources.")) -def user_resource_roles(conn: db.DbConnection, user: User) -> dict[uuid.UUID, tuple[Role, ...]]: - """Fetch all the user's roles on resources.""" - with db.cursor(conn) as cursor: - cursor.execute( - "SELECT res.*, rls.*, p.*" - "FROM resources AS res INNER JOIN " - "group_user_roles_on_resources AS guror " - "ON res.resource_id=guror.resource_id " - "LEFT JOIN roles AS rls " - "ON guror.role_id=rls.role_id " - "LEFT JOIN role_privileges AS rp " - "ON rls.role_id=rp.role_id " - "LEFT JOIN privileges AS p " - "ON rp.privilege_id=p.privilege_id " - "WHERE guror.user_id = ?", - (str(user.user_id),)) - return __build_resource_roles__( - (dict(row) for row in cursor.fetchall())) diff --git a/gn3/auth/authorisation/users/views.py b/gn3/auth/authorisation/users/views.py deleted file mode 100644 index f75b51e..0000000 --- a/gn3/auth/authorisation/users/views.py +++ /dev/null @@ -1,173 +0,0 @@ -"""User authorisation endpoints.""" -import traceback -from typing import Any -from functools import partial - -import sqlite3 -from email_validator import validate_email, EmailNotValidError -from flask import request, jsonify, Response, Blueprint, current_app - -from gn3.auth import db -from gn3.auth.dictify import dictify -from gn3.auth.db_utils import with_db_connection -from gn3.auth.authorisation.oauth2.resource_server import require_oauth -from gn3.auth.authorisation.users import User, save_user, set_user_password -from gn3.auth.authorisation.oauth2.oauth2token import token_by_access_token - -from .models import list_users -from .collections.views import collections - -from ..groups.models import user_group as _user_group -from ..resources.models import user_resources as _user_resources -from ..roles.models import assign_default_roles, user_roles as _user_roles -from ..errors import ( - NotFoundError, UsernameError, PasswordError, UserRegistrationError) - -users = Blueprint("users", __name__) -users.register_blueprint(collections, url_prefix="/collections") - -@users.route("/", methods=["GET"]) -@require_oauth("profile") -def user_details() -> Response: - """Return user's details.""" - with require_oauth.acquire("profile") as the_token: - user = the_token.user - user_dets = { - "user_id": user.user_id, "email": user.email, "name": user.name, - "group": False - } - with db.connection(current_app.config["AUTH_DB"]) as conn: - the_group = _user_group(conn, user).maybe(# type: ignore[misc] - False, lambda grp: grp)# type: ignore[arg-type] - return jsonify({ - **user_dets, - "group": dictify(the_group) if the_group else False - }) - -@users.route("/roles", methods=["GET"]) -@require_oauth("role") -def user_roles() -> Response: - """Return the non-resource roles assigned to the user.""" - with require_oauth.acquire("role") as token: - with db.connection(current_app.config["AUTH_DB"]) as conn: - return jsonify(tuple( - dictify(role) for role in _user_roles(conn, token.user))) - -def validate_password(password, confirm_password) -> str: - """Validate the provided password.""" - if len(password) < 8: - raise PasswordError("The password must be at least 8 characters long.") - - if password != confirm_password: - raise PasswordError("Mismatched password values") - - return password - -def validate_username(name: str) -> str: - """Validate the provides name.""" - if name == "": - raise UsernameError("User's name not provided.") - - return name - -def __assert_not_logged_in__(conn: db.DbConnection): - bearer = request.headers.get('Authorization') - if bearer: - token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc] - False, lambda tok: tok) - if token: - raise UserRegistrationError( - "Cannot register user while authenticated") - -@users.route("/register", methods=["POST"]) -def register_user() -> Response: - """Register a user.""" - with db.connection(current_app.config["AUTH_DB"]) as conn: - __assert_not_logged_in__(conn) - - try: - form = request.form - email = validate_email(form.get("email", "").strip(), - check_deliverability=True) - password = validate_password( - form.get("password", "").strip(), - form.get("confirm_password", "").strip()) - user_name = validate_username(form.get("user_name", "").strip()) - with db.cursor(conn) as cursor: - user, _hashed_password = set_user_password( - cursor, save_user( - cursor, email["email"], user_name), password) - assign_default_roles(cursor, user) - return jsonify( - { - "user_id": user.user_id, - "email": user.email, - "name": user.name - }) - except sqlite3.IntegrityError as sq3ie: - current_app.logger.debug(traceback.format_exc()) - raise UserRegistrationError( - "A user with that email already exists") from sq3ie - except EmailNotValidError as enve: - current_app.logger.debug(traceback.format_exc()) - raise(UserRegistrationError(f"Email Error: {str(enve)}")) from enve - - raise Exception( - "unknown_error", "The system experienced an unexpected error.") - -@users.route("/group", methods=["GET"]) -@require_oauth("profile group") -def user_group() -> Response: - """Retrieve the group in which the user is a member.""" - with require_oauth.acquire("profile group") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - group = _user_group(conn, the_token.user).maybe(# type: ignore[misc] - False, lambda grp: grp)# type: ignore[arg-type] - - if group: - return jsonify(dictify(group)) - raise NotFoundError("User is not a member of any group.") - -@users.route("/resources", methods=["GET"]) -@require_oauth("profile resource") -def user_resources() -> Response: - """Retrieve the resources a user has access to.""" - with require_oauth.acquire("profile resource") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - return jsonify([ - dictify(resource) for resource in - _user_resources(conn, the_token.user)]) - -@users.route("group/join-request", methods=["GET"]) -@require_oauth("profile group") -def user_join_request_exists(): - """Check whether a user has an active group join request.""" - def __request_exists__(conn: db.DbConnection, user: User) -> dict[str, Any]: - with db.cursor(conn) as cursor: - cursor.execute( - "SELECT * FROM group_join_requests WHERE requester_id=? AND " - "status = 'PENDING'", - (str(user.user_id),)) - res = cursor.fetchone() - if res: - return { - "request_id": res["request_id"], - "exists": True - } - return{ - "status": "Not found", - "exists": False - } - with require_oauth.acquire("profile group") as the_token: - return jsonify(with_db_connection(partial( - __request_exists__, user=the_token.user))) - -@users.route("/list", methods=["GET"]) -@require_oauth("profile user") -def list_all_users() -> Response: - """List all the users.""" - with require_oauth.acquire("profile group") as _the_token: - return jsonify(tuple( - dictify(user) for user in with_db_connection(list_users))) |
