"""Handle user collections.""" import json from uuid import UUID, uuid4 from datetime import datetime from redis import Redis from email_validator import validate_email, EmailNotValidError from gn_auth.auth.errors import InvalidData, NotFoundError from ..models import User REDIS_COLLECTIONS_KEY = "collections2" __OLD_REDIS_COLLECTIONS_KEY__ = "collections" class CollectionJSONEncoder(json.JSONEncoder): """Serialise collection objects into JSON.""" def default(self, obj):# pylint: disable=[arguments-renamed] if isinstance(obj, UUID): return str(obj) if isinstance(obj, datetime): return obj.strftime("%b %d %Y %I:%M%p") return json.JSONEncoder.default(self, obj) def __valid_email__(email:str) -> bool: """Check for email validity.""" try: validate_email(email, check_deliverability=True) except EmailNotValidError as _enve: return False return True def __toggle_boolean_field__( rconn: Redis, email: str, field: str): """Toggle the valuen of a boolean field""" mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}") if bool(mig_dict): rconn.hset("migratable-accounts", email, json.dumps({**mig_dict, field: not mig_dict.get(field, True)})) def __build_email_uuid_bridge__(rconn: Redis): """ Build a connection between new accounts and old user accounts. The only thing that is common between the two is the email address, therefore, we use that to link the two items. """ old_accounts = { account["email_address"]: { "user_id": account["user_id"], "collections-migrated": False, "resources_migrated": False } for account in ( acct for acct in (json.loads(usr) for usr in rconn.hgetall("users").values()) if (bool(acct.get("email_address", False)) and __valid_email__(acct["email_address"]))) } if bool(old_accounts): rconn.hset("migratable-accounts", mapping={ key: json.dumps(value) for key,value in old_accounts.items() }) return old_accounts def __retrieve_old_accounts__(rconn: Redis) -> dict: accounts = rconn.hgetall("migratable-accounts") if accounts: return { key: json.loads(value) for key, value in accounts.items() } return __build_email_uuid_bridge__(rconn) def parse_collection(coll: dict) -> dict: """Parse the collection as persisted in redis to a usable python object.""" created = coll.get("created", coll.get("created_timestamp")) changed = coll.get("changed", coll.get("changed_timestamp")) return { "id": UUID(coll["id"]), "name": coll["name"], "created": datetime.strptime(created, "%b %d %Y %I:%M%p"), "changed": datetime.strptime(changed, "%b %d %Y %I:%M%p"), "num_members": int(coll["num_members"]), "members": coll["members"] } def dump_collection(pythoncollection: dict) -> str: """Convert the collection from a python object to a json string.""" return json.dumps(pythoncollection, cls=CollectionJSONEncoder) def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple: """Retrieve any old collections relating to the user.""" return tuple(parse_collection(coll) for coll in json.loads(rconn.hget( __OLD_REDIS_COLLECTIONS_KEY__, str(old_user_id)) or "[]")) def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]: """Retrieve current user collections.""" collections = tuple(parse_collection(coll) for coll in json.loads( rconn.hget(REDIS_COLLECTIONS_KEY, str(user.user_id)) or "[]")) old_accounts = __retrieve_old_accounts__(rconn) if (user.email in old_accounts and not old_accounts[user.email]["collections-migrated"]): old_user_id = old_accounts[user.email]["user_id"] collections = tuple({ coll["id"]: coll for coll in ( collections + __retrieve_old_user_collections__( rconn, UUID(old_user_id))) }.values()) __toggle_boolean_field__(rconn, user.email, "collections-migrated") rconn.hset( REDIS_COLLECTIONS_KEY, key=str(user.user_id), value=json.dumps(collections, cls=CollectionJSONEncoder)) return collections def save_collections(rconn: Redis, user: User, collections: tuple[dict, ...]) -> tuple[dict, ...]: """Save the `collections` to redis.""" rconn.hset( REDIS_COLLECTIONS_KEY, str(user.user_id), json.dumps(collections, cls=CollectionJSONEncoder)) return collections def add_to_user_collections(rconn: Redis, user: User, collection: dict) -> dict: """Add `collection` to list of user collections.""" ucolls = user_collections(rconn, user) save_collections(rconn, user, ucolls + (collection,)) return collection def create_collection(rconn: Redis, user: User, name: str, traits: tuple) -> dict: """Create a new collection.""" now = datetime.utcnow() return add_to_user_collections(rconn, user, { "id": uuid4(), "name": name, "created": now, "changed": now, "num_members": len(traits), "members": traits }) def get_collection(rconn: Redis, user: User, collection_id: UUID) -> dict: """Retrieve the collection with ID `collection_id`.""" colls = tuple(coll for coll in user_collections(rconn, user) if coll["id"] == collection_id) if len(colls) == 0: raise NotFoundError( f"Could not find a collection with ID `{collection_id}` for user " f"with ID `{user.user_id}`") if len(colls) > 1: err = InvalidData( "More than one collection was found having the ID " f"`{collection_id}` for user with ID `{user.user_id}`.") err.error_code = 513 raise err return colls[0] def __raise_if_collections_empty__(user: User, collections: tuple[dict, ...]): """Raise an exception if no collections are found for `user`.""" if len(collections) < 1: raise NotFoundError(f"No collections found for user `{user.user_id}`") def __raise_if_not_single_collection__( user: User, collection_id: UUID, collections: tuple[dict, ...]): """ Raise an exception there is zero, or more than one collection for `user`. """ if len(collections) == 0: raise NotFoundError(f"No collections found for user `{user.user_id}` " f"with ID `{collection_id}`.") if len(collections) > 1: err = InvalidData( "More than one collection was found having the ID " f"`{collection_id}` for user with ID `{user.user_id}`.") err.error_code = 513 raise err def delete_collections(rconn: Redis, user: User, collection_ids: tuple[UUID, ...]) -> tuple[dict, ...]: """ Delete collections with the given `collection_ids` returning the deleted collections. """ ucolls = user_collections(rconn, user) save_collections( rconn, user, tuple(coll for coll in ucolls if coll["id"] not in collection_ids)) return tuple(coll for coll in ucolls if coll["id"] in collection_ids) def add_traits(rconn: Redis, user: User, collection_id: UUID, traits: tuple[str, ...]) -> dict: """ Add `traits` to the `user` collection identified by `collection_id`. Returns: The collection with the new traits added. """ ucolls = user_collections(rconn, user) __raise_if_collections_empty__(user, ucolls) mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) __raise_if_not_single_collection__(user, collection_id, mod_col) new_members = tuple(set(tuple(mod_col[0]["members"]) + traits)) new_coll = { **mod_col[0], "members": new_members, "num_members": len(new_members) } save_collections( rconn, user, (tuple(coll for coll in ucolls if coll["id"] != collection_id) + (new_coll,))) return new_coll def remove_traits(rconn: Redis, user: User, collection_id: UUID, traits: tuple[str, ...]) -> dict: """ Remove `traits` from the `user` collection identified by `collection_id`. Returns: The collection with the specified `traits` removed. """ ucolls = user_collections(rconn, user) __raise_if_collections_empty__(user, ucolls) mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) __raise_if_not_single_collection__(user, collection_id, mod_col) new_members = tuple( trait for trait in mod_col[0]["members"] if trait not in traits) new_coll = { **mod_col[0], "members": new_members, "num_members": len(new_members) } save_collections( rconn, user, (tuple(coll for coll in ucolls if coll["id"] != collection_id) + (new_coll,))) return new_coll def change_name(rconn: Redis, user: User, collection_id: UUID, new_name: str) -> dict: """ Change the collection's name. Returns: The collection with the new name. """ ucolls = user_collections(rconn, user) __raise_if_collections_empty__(user, ucolls) mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id) __raise_if_not_single_collection__(user, collection_id, mod_col) new_coll = {**mod_col[0], "name": new_name} save_collections( rconn, user, (tuple(coll for coll in ucolls if coll["id"] != collection_id) + (new_coll,))) return new_coll