diff options
Diffstat (limited to 'gn3/auth/authorisation/users/collections')
| -rw-r--r-- | gn3/auth/authorisation/users/collections/__init__.py | 1 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/collections/models.py | 269 | ||||
| -rw-r--r-- | gn3/auth/authorisation/users/collections/views.py | 239 |
3 files changed, 0 insertions, 509 deletions
diff --git a/gn3/auth/authorisation/users/collections/__init__.py b/gn3/auth/authorisation/users/collections/__init__.py deleted file mode 100644 index 88ab040..0000000 --- a/gn3/auth/authorisation/users/collections/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package dealing with user collections.""" diff --git a/gn3/auth/authorisation/users/collections/models.py b/gn3/auth/authorisation/users/collections/models.py deleted file mode 100644 index 7577fa8..0000000 --- a/gn3/auth/authorisation/users/collections/models.py +++ /dev/null @@ -1,269 +0,0 @@ -"""Handle user collections.""" -import json -from uuid import UUID, uuid4 -from datetime import datetime - -from redis import Redis -from email_validator import validate_email, EmailNotValidError - -from gn3.auth.authorisation.errors import InvalidData, NotFoundError - -from ..models import User - -__OLD_REDIS_COLLECTIONS_KEY__ = "collections" -__REDIS_COLLECTIONS_KEY__ = "collections2" - -class CollectionJSONEncoder(json.JSONEncoder): - """Serialise collection objects into JSON.""" - def default(self, obj):# pylint: disable=[arguments-renamed] - if isinstance(obj, UUID): - return str(obj) - if isinstance(obj, datetime): - return obj.strftime("%b %d %Y %I:%M%p") - return json.JSONEncoder.default(self, obj) - -def __valid_email__(email:str) -> bool: - """Check for email validity.""" - try: - validate_email(email, check_deliverability=True) - except EmailNotValidError as _enve: - return False - return True - -def __toggle_boolean_field__( - rconn: Redis, email: str, field: str): - """Toggle the valuen of a boolean field""" - mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}") - if bool(mig_dict): - rconn.hset("migratable-accounts", email, - json.dumps({**mig_dict, field: not mig_dict.get(field, True)})) - -def __build_email_uuid_bridge__(rconn: Redis): - """ - Build a connection between new accounts and old user accounts. - - The only thing that is common between the two is the email address, - therefore, we use that to link the two items. - """ - old_accounts = { - account["email_address"]: { - "user_id": account["user_id"], - "collections-migrated": False, - "resources_migrated": False - } for account in ( - acct for acct in - (json.loads(usr) for usr in rconn.hgetall("users").values()) - if (bool(acct.get("email_address", False)) and - __valid_email__(acct["email_address"]))) - } - if bool(old_accounts): - rconn.hset("migratable-accounts", mapping={ - key: json.dumps(value) for key,value in old_accounts.items() - }) - return old_accounts - -def __retrieve_old_accounts__(rconn: Redis) -> dict: - accounts = rconn.hgetall("migratable-accounts") - if accounts: - return { - key: json.loads(value) for key, value in accounts.items() - } - return __build_email_uuid_bridge__(rconn) - -def parse_collection(coll: dict) -> dict: - """Parse the collection as persisted in redis to a usable python object.""" - created = coll.get("created", coll.get("created_timestamp")) - changed = coll.get("changed", coll.get("changed_timestamp")) - return { - "id": UUID(coll["id"]), - "name": coll["name"], - "created": datetime.strptime(created, "%b %d %Y %I:%M%p"), - "changed": datetime.strptime(changed, "%b %d %Y %I:%M%p"), - "num_members": int(coll["num_members"]), - "members": coll["members"] - } - -def dump_collection(pythoncollection: dict) -> str: - """Convert the collection from a python object to a json string.""" - return json.dumps(pythoncollection, cls=CollectionJSONEncoder) - -def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple: - """Retrieve any old collections relating to the user.""" - return tuple(parse_collection(coll) for coll in - json.loads(rconn.hget( - __OLD_REDIS_COLLECTIONS_KEY__, str(old_user_id)) or "[]")) - -def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]: - """Retrieve current user collections.""" - collections = tuple(parse_collection(coll) for coll in json.loads( - rconn.hget(__REDIS_COLLECTIONS_KEY__, str(user.user_id)) or - "[]")) - old_accounts = __retrieve_old_accounts__(rconn) - if (user.email in old_accounts and - not old_accounts[user.email]["collections-migrated"]): - old_user_id = old_accounts[user.email]["user_id"] - collections = tuple({ - coll["id"]: coll for coll in ( - collections + __retrieve_old_user_collections__( - rconn, UUID(old_user_id))) - }.values()) - __toggle_boolean_field__(rconn, user.email, "collections-migrated") - rconn.hset( - __REDIS_COLLECTIONS_KEY__, - key=str(user.user_id), - value=json.dumps(collections, cls=CollectionJSONEncoder)) - return collections - -def save_collections(rconn: Redis, user: User, collections: tuple[dict, ...]) -> tuple[dict, ...]: - """Save the `collections` to redis.""" - rconn.hset( - __REDIS_COLLECTIONS_KEY__, - str(user.user_id), - json.dumps(collections, cls=CollectionJSONEncoder)) - return collections - -def add_to_user_collections(rconn: Redis, user: User, collection: dict) -> dict: - """Add `collection` to list of user collections.""" - ucolls = user_collections(rconn, user) - save_collections(rconn, user, ucolls + (collection,)) - return collection - -def create_collection(rconn: Redis, user: User, name: str, traits: tuple) -> dict: - """Create a new collection.""" - now = datetime.utcnow() - return add_to_user_collections(rconn, user, { - "id": uuid4(), - "name": name, - "created": now, - "changed": now, - "num_members": len(traits), - "members": traits - }) - -def get_collection(rconn: Redis, user: User, collection_id: UUID) -> dict: - """Retrieve the collection with ID `collection_id`.""" - colls = tuple(coll for coll in user_collections(rconn, user) - if coll["id"] == collection_id) - if len(colls) == 0: - raise NotFoundError( - f"Could not find a collection with ID `{collection_id}` for user " - f"with ID `{user.user_id}`") - if len(colls) > 1: - err = InvalidData( - "More than one collection was found having the ID " - f"`{collection_id}` for user with ID `{user.user_id}`.") - err.error_code = 513 - raise err - return colls[0] - -def __raise_if_collections_empty__(user: User, collections: tuple[dict, ...]): - """Raise an exception if no collections are found for `user`.""" - if len(collections) < 1: - raise NotFoundError(f"No collections found for user `{user.user_id}`") - -def __raise_if_not_single_collection__( - user: User, collection_id: UUID, collections: tuple[dict, ...]): - """ - Raise an exception there is zero, or more than one collection for `user`. - """ - if len(collections) == 0: - raise NotFoundError(f"No collections found for user `{user.user_id}` " - f"with ID `{collection_id}`.") - if len(collections) > 1: - err = InvalidData( - "More than one collection was found having the ID " - f"`{collection_id}` for user with ID `{user.user_id}`.") - err.error_code = 513 - raise err - -def delete_collections(rconn: Redis, - user: User, - collection_ids: tuple[UUID, ...]) -> tuple[dict, ...]: - """ - Delete collections with the given `collection_ids` returning the deleted - collections. - """ - ucolls = user_collections(rconn, user) - save_collections( - rconn, - user, - tuple(coll for coll in ucolls if coll["id"] not in collection_ids)) - return tuple(coll for coll in ucolls if coll["id"] in collection_ids) - -def add_traits(rconn: Redis, - user: User, - collection_id: UUID, - traits: tuple[str, ...]) -> dict: - """ - Add `traits` to the `user` collection identified by `collection_id`. - - Returns: The collection with the new traits added. - """ - ucolls = user_collections(rconn, user) - __raise_if_collections_empty__(user, ucolls) - - mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) - __raise_if_not_single_collection__(user, collection_id, mod_col) - new_members = tuple(set(tuple(mod_col[0]["members"]) + traits)) - new_coll = { - **mod_col[0], - "members": new_members, - "num_members": len(new_members) - } - save_collections( - rconn, - user, - (tuple(coll for coll in ucolls if coll["id"] != collection_id) + - (new_coll,))) - return new_coll - -def remove_traits(rconn: Redis, - user: User, - collection_id: UUID, - traits: tuple[str, ...]) -> dict: - """ - Remove `traits` from the `user` collection identified by `collection_id`. - - Returns: The collection with the specified `traits` removed. - """ - ucolls = user_collections(rconn, user) - __raise_if_collections_empty__(user, ucolls) - - mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) - __raise_if_not_single_collection__(user, collection_id, mod_col) - new_members = tuple( - trait for trait in mod_col[0]["members"] if trait not in traits) - new_coll = { - **mod_col[0], - "members": new_members, - "num_members": len(new_members) - } - save_collections( - rconn, - user, - (tuple(coll for coll in ucolls if coll["id"] != collection_id) + - (new_coll,))) - return new_coll - -def change_name(rconn: Redis, - user: User, - collection_id: UUID, - new_name: str) -> dict: - """ - Change the collection's name. - - Returns: The collection with the new name. - """ - ucolls = user_collections(rconn, user) - __raise_if_collections_empty__(user, ucolls) - - mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) - __raise_if_not_single_collection__(user, collection_id, mod_col) - - new_coll = {**mod_col[0], "name": new_name} - save_collections( - rconn, - user, - (tuple(coll for coll in ucolls if coll["id"] != collection_id) + - (new_coll,))) - return new_coll diff --git a/gn3/auth/authorisation/users/collections/views.py b/gn3/auth/authorisation/users/collections/views.py deleted file mode 100644 index 775e8bc..0000000 --- a/gn3/auth/authorisation/users/collections/views.py +++ /dev/null @@ -1,239 +0,0 @@ -"""Views regarding user collections.""" -from uuid import UUID - -from redis import Redis -from flask import jsonify, request, Response, Blueprint, current_app - -from gn3.auth import db -from gn3.auth.db_utils import with_db_connection -from gn3.auth.authorisation.checks import require_json -from gn3.auth.authorisation.errors import NotFoundError - -from gn3.auth.authorisation.users import User, user_by_id -from gn3.auth.authorisation.oauth2.resource_server import require_oauth - -from .models import ( - add_traits, - change_name, - remove_traits, - get_collection, - user_collections, - save_collections, - create_collection, - delete_collections as _delete_collections) - -collections = Blueprint("collections", __name__) - -@collections.route("/list") -@require_oauth("profile user") -def list_user_collections() -> Response: - """Retrieve the user ids""" - with (require_oauth.acquire("profile user") as the_token, - Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - return jsonify(user_collections(redisconn, the_token.user)) - -@collections.route("/<uuid:anon_id>/list") -def list_anonymous_collections(anon_id: UUID) -> Response: - """Fetch anonymous collections""" - with Redis.from_url( - current_app.config["REDIS_URI"], decode_responses=True) as redisconn: - def __list__(conn: db.DbConnection) -> tuple: - try: - _user = user_by_id(conn, anon_id) - current_app.logger.warning( - "Fetch collections for authenticated user using the " - "`list_user_collections()` endpoint.") - return tuple() - except NotFoundError as _nfe: - return user_collections( - redisconn, User(anon_id, "anon@ymous.user", "Anonymous User")) - - return jsonify(with_db_connection(__list__)) - -@require_oauth("profile user") -def __new_collection_as_authenticated_user__(redisconn, name, traits): - """Create a new collection as an authenticated user.""" - with require_oauth.acquire("profile user") as token: - return create_collection(redisconn, token.user, name, traits) - -def __new_collection_as_anonymous_user__(redisconn, name, traits): - """Create a new collection as an anonymous user.""" - return create_collection(redisconn, - User(UUID(request.json.get("anon_id")), - "anon@ymous.user", - "Anonymous User"), - name, - traits) - -@collections.route("/new", methods=["POST"]) -@require_json -def new_user_collection() -> Response: - """Create a new collection.""" - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - traits = tuple(request.json.get("traits", tuple()))# type: ignore[union-attr] - name = request.json.get("name")# type: ignore[union-attr] - if bool(request.headers.get("Authorization")): - return jsonify(__new_collection_as_authenticated_user__( - redisconn, name, traits)) - return jsonify(__new_collection_as_anonymous_user__( - redisconn, name, traits)) - -@collections.route("/<uuid:collection_id>/view", methods=["POST"]) -@require_json -def view_collection(collection_id: UUID) -> Response: - """View a particular collection""" - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if bool(request.headers.get("Authorization")): - with require_oauth.acquire("profile user") as token: - return jsonify(get_collection(redisconn, - token.user, - collection_id)) - return jsonify(get_collection( - redisconn, - User( - UUID(request.json.get("anon_id")),#type: ignore[union-attr] - "anon@ymous.user", - "Anonymous User"), - collection_id)) - -@collections.route("/anonymous/import", methods=["POST"]) -@require_json -@require_oauth("profile user") -def import_anonymous() -> Response: - """Import anonymous collections.""" - with (require_oauth.acquire("profile user") as token, - Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - anon_id = UUID(request.json.get("anon_id"))#type: ignore[union-attr] - anon_colls = user_collections(redisconn, User( - anon_id, "anon@ymous.user", "Anonymous User")) - save_collections( - redisconn, - token.user, - (user_collections(redisconn, token.user) + - anon_colls)) - redisconn.hdel("collections", str(anon_id)) - return jsonify({ - "message": f"Import of {len(anon_colls)} was successful." - }) - -@collections.route("/anonymous/delete", methods=["POST"]) -@require_json -@require_oauth("profile user") -def delete_anonymous() -> Response: - """Delete anonymous collections.""" - with (require_oauth.acquire("profile user") as _token, - Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - anon_id = UUID(request.json.get("anon_id"))#type: ignore[union-attr] - anon_colls = user_collections(redisconn, User( - anon_id, "anon@ymous.user", "Anonymous User")) - redisconn.hdel("collections", str(anon_id)) - return jsonify({ - "message": f"Deletion of {len(anon_colls)} was successful." - }) - -@collections.route("/delete", methods=["POST"]) -@require_json -def delete_collections(): - """Delete specified collections.""" - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - coll_ids = tuple(UUID(cid) for cid in request.json["collection_ids"]) - deleted = _delete_collections( - redisconn, - User(request.json["anon_id"], "anon@ymous.user", "Anonymous User"), - coll_ids) - if bool(request.headers.get("Authorization")): - with require_oauth.acquire("profile user") as token: - deleted = deleted + _delete_collections( - redisconn, token.user, coll_ids) - - return jsonify({ - "message": f"Deleted {len(deleted)} collections."}) - -@collections.route("/<uuid:collection_id>/traits/remove", methods=["POST"]) -@require_json -def remove_traits_from_collection(collection_id: UUID) -> Response: - """Remove specified traits from collection with ID `collection_id`.""" - if len(request.json["traits"]) < 1:#type: ignore[index] - return jsonify({"message": "No trait to remove from collection."}) - - the_traits = tuple(request.json["traits"])#type: ignore[index] - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if not bool(request.headers.get("Authorization")): - coll = remove_traits( - redisconn, - User(request.json["anon_id"],#type: ignore[index] - "anon@ymous.user", - "Anonymous User"), - collection_id, - the_traits) - else: - with require_oauth.acquire("profile user") as token: - coll = remove_traits( - redisconn, token.user, collection_id, the_traits) - - return jsonify({ - "message": f"Deleted {len(the_traits)} traits from collection.", - "collection": coll - }) - -@collections.route("/<uuid:collection_id>/traits/add", methods=["POST"]) -@require_json -def add_traits_to_collection(collection_id: UUID) -> Response: - """Add specified traits to collection with ID `collection_id`.""" - if len(request.json["traits"]) < 1:#type: ignore[index] - return jsonify({"message": "No trait to add to collection."}) - - the_traits = tuple(request.json["traits"])#type: ignore[index] - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if not bool(request.headers.get("Authorization")): - coll = add_traits( - redisconn, - User(request.json["anon_id"],#type: ignore[index] - "anon@ymous.user", - "Anonymous User"), - collection_id, - the_traits) - else: - with require_oauth.acquire("profile user") as token: - coll = add_traits( - redisconn, token.user, collection_id, the_traits) - - return jsonify({ - "message": f"Added {len(the_traits)} traits to collection.", - "collection": coll - }) - -@collections.route("/<uuid:collection_id>/rename", methods=["POST"]) -@require_json -def rename_collection(collection_id: UUID) -> Response: - """Rename the given collection""" - if not bool(request.json["new_name"]):#type: ignore[index] - return jsonify({"message": "No new name to change to."}) - - new_name = request.json["new_name"]#type: ignore[index] - with (Redis.from_url(current_app.config["REDIS_URI"], - decode_responses=True) as redisconn): - if not bool(request.headers.get("Authorization")): - coll = change_name(redisconn, - User(UUID(request.json["anon_id"]),#type: ignore[index] - "anon@ymous.user", - "Anonymous User"), - collection_id, - new_name) - else: - with require_oauth.acquire("profile user") as token: - coll = change_name( - redisconn, token.user, collection_id, new_name) - - return jsonify({ - "message": "Collection rename successful.", - "collection": coll - }) |
