"""Handle user collections.""" import json from uuid import UUID, uuid4 from datetime import datetime from redis import Redis from email_validator import validate_email, EmailNotValidError from gn3.auth.authorisation.errors import InvalidData, NotFoundError from ..models import User class CollectionJSONEncoder(json.JSONEncoder): """Serialise collection objects into JSON.""" def default(self, obj):# pylint: disable=[arguments-renamed] if isinstance(obj, UUID): return str(obj) if isinstance(obj, datetime): return obj.strftime("%b %d %Y %I:%M%p") return json.JSONEncoder.default(self, obj) def __valid_email__(email:str) -> bool: """Check for email validity.""" try: validate_email(email, check_deliverability=True) except EmailNotValidError as _enve: return False return True def __toggle_boolean_field__( rconn: Redis, email: str, field: str): """Toggle the valuen of a boolean field""" mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}") if bool(mig_dict): rconn.hset("migratable-accounts", email, {**mig_dict, field: not mig_dict.get(field, True)}) def __build_email_uuid_bridge__(rconn: Redis): """ Build a connection between new accounts and old user accounts. The only thing that is common between the two is the email address, therefore, we use that to link the two items. """ old_accounts = { account["email_address"]: { "user_id": account["user_id"], "collections-migrated": False, "resources_migrated": False } for account in ( acct for acct in (json.loads(usr) for usr in rconn.hgetall("users").values()) if (bool(acct.get("email_address", False)) and __valid_email__(acct["email_address"]))) } if bool(old_accounts): rconn.hset("migratable-accounts", mapping={ key: json.dumps(value) for key,value in old_accounts.items() }) return old_accounts def __retrieve_old_accounts__(rconn: Redis) -> dict: accounts = rconn.hgetall("migratable-accounts") if accounts: return { key: json.loads(value) for key, value in accounts.items() } return __build_email_uuid_bridge__(rconn) def parse_collection(coll: dict) -> dict: """Parse the collection as persisted in redis to a usable python object.""" return { "id": UUID(coll["id"]), "name": coll["name"], "created": datetime.strptime(coll["created"], "%b %d %Y %I:%M%p"), "changed": datetime.strptime(coll["changed"], "%b %d %Y %I:%M%p"), "num_members": int(coll["num_members"]), "members": coll["members"] } def dump_collection(pythoncollection: dict) -> str: """Convert the collection from a python object to a json string.""" return json.dumps(pythoncollection, cls=CollectionJSONEncoder) def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple: """Retrieve any old collections relating to the user.""" return tuple(parse_collection(coll) for coll in json.loads(rconn.hget("collections", old_user_id) or "[]")) def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]: """Retrieve current user collections.""" collections = tuple(parse_collection(coll) for coll in json.loads( rconn.hget("collections", str(user.user_id)) or "[]")) old_accounts = __retrieve_old_accounts__(rconn) if (user.email in old_accounts and not old_accounts[user.email]["collections-migrated"]): old_user_id = old_accounts[user.email]["user_id"] collections = tuple(set(collections + __retrieve_old_user_collections__( rconn, UUID(old_user_id)))) rconn.hdel("collections", old_user_id) __toggle_boolean_field__(rconn, user.email, "collections-migrated") rconn.hset( "collections", key=user.user_id, value=json.dumps(collections)) return collections def save_collections(rconn: Redis, user: User, collections: tuple[dict, ...]) -> tuple[dict, ...]: """Save the `collections` to redis.""" rconn.hset( "collections", str(user.user_id), json.dumps(collections, cls=CollectionJSONEncoder)) return collections def add_to_user_collections(rconn: Redis, user: User, collection: dict) -> dict: """Add `collection` to list of user collections.""" ucolls = user_collections(rconn, user) save_collections(rconn, user, ucolls + (collection,)) return collection def create_collection(rconn: Redis, user: User, name: str, traits: tuple) -> dict: """Create a new collection.""" now = datetime.utcnow() return add_to_user_collections(rconn, user, { "id": uuid4(), "name": name, "created": now, "changed": now, "num_members": len(traits), "members": traits }) def get_collection(rconn: Redis, user: User, collection_id: UUID) -> dict: """Retrieve the collection with ID `collection_id`.""" colls = tuple(coll for coll in user_collections(rconn, user) if coll["id"] == collection_id) if len(colls) == 0: raise NotFoundError( f"Could not find a collection with ID `{collection_id}` for user " f"with ID `{user.user_id}`") if len(colls) > 1: err = InvalidData( "More than one collection was found having the ID " f"`{collection_id}` for user with ID `{user.user_id}`.") err.error_code = 513 raise err return colls[0]