aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/users/collections/models.py
blob: eaee9af1d484b1318c4f9c16a23cff50dba928f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Handle user collections."""
import json
from uuid import UUID, uuid4
from datetime import datetime

from redis import Redis
from email_validator import validate_email, EmailNotValidError

from gn3.auth.authorisation.errors import InvalidData, NotFoundError

from ..models import User

class CollectionJSONEncoder(json.JSONEncoder):
    """Serialise collection objects into JSON."""
    def default(self, obj):# pylint: disable=[arguments-renamed]
        if isinstance(obj, UUID):
            return str(obj)
        if isinstance(obj, datetime):
            return obj.strftime("%b %d %Y %I:%M%p")
        return json.JSONEncoder.default(self, obj)

def __valid_email__(email:str) -> bool:
    """Check for email validity."""
    try:
        validate_email(email, check_deliverability=True)
    except EmailNotValidError as _enve:
        return False
    return True

def __toggle_boolean_field__(
        rconn: Redis, email: str, field: str):
    """Toggle the valuen of a boolean field"""
    mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}")
    if bool(mig_dict):
        rconn.hset("migratable-accounts", email,
                   {**mig_dict, field: not mig_dict.get(field, True)})

def __build_email_uuid_bridge__(rconn: Redis):
    """
    Build a connection between new accounts and old user accounts.

    The only thing that is common between the two is the email address,
    therefore, we use that to link the two items.
    """
    old_accounts = {
        account["email_address"]: {
            "user_id": account["user_id"],
            "collections-migrated": False,
            "resources_migrated": False
        } for account in (
            acct for acct in
            (json.loads(usr) for usr in rconn.hgetall("users").values())
            if (bool(acct.get("email_address", False)) and
                __valid_email__(acct["email_address"])))
    }
    if bool(old_accounts):
        rconn.hset("migratable-accounts", mapping={
            key: json.dumps(value) for key,value in old_accounts.items()
        })
    return old_accounts

def __retrieve_old_accounts__(rconn: Redis) -> dict:
    accounts = rconn.hgetall("migratable-accounts")
    if accounts:
        return {
            key: json.loads(value) for key, value in accounts.items()
        }
    return __build_email_uuid_bridge__(rconn)

def parse_collection(coll: dict) -> dict:
    """Parse the collection as persisted in redis to a usable python object."""
    return {
        "id": UUID(coll["id"]),
        "name": coll["name"],
        "created": datetime.strptime(coll["created"], "%b %d %Y %I:%M%p"),
        "changed": datetime.strptime(coll["changed"], "%b %d %Y %I:%M%p"),
        "num_members": int(coll["num_members"]),
        "members": coll["members"]
    }

def dump_collection(pythoncollection: dict) -> str:
    """Convert the collection from a python object to a json string."""
    return json.dumps(pythoncollection, cls=CollectionJSONEncoder)

def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple:
    """Retrieve any old collections relating to the user."""
    return tuple(parse_collection(coll) for coll in
                 json.loads(rconn.hget("collections", old_user_id) or "[]"))

def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]:
    """Retrieve current user collections."""
    collections = tuple(parse_collection(coll) for coll in json.loads(
        rconn.hget("collections", str(user.user_id)) or
        "[]"))
    old_accounts = __retrieve_old_accounts__(rconn)
    if (user.email in old_accounts and
        not old_accounts[user.email]["collections-migrated"]):
        old_user_id = old_accounts[user.email]["user_id"]
        collections = tuple(set(collections + __retrieve_old_user_collections__(
            rconn, UUID(old_user_id))))
        rconn.hdel("collections", old_user_id)
        __toggle_boolean_field__(rconn, user.email, "collections-migrated")
        rconn.hset(
            "collections", key=user.user_id, value=json.dumps(collections))
    return collections

def save_collections(rconn: Redis, user: User, collections: tuple[dict, ...]) -> tuple[dict, ...]:
    """Save the `collections` to redis."""
    rconn.hset(
        "collections",
        str(user.user_id),
        json.dumps(collections, cls=CollectionJSONEncoder))
    return collections

def add_to_user_collections(rconn: Redis, user: User, collection: dict) -> dict:
    """Add `collection` to list of user collections."""
    ucolls = user_collections(rconn, user)
    save_collections(rconn, user, ucolls + (collection,))
    return collection

def create_collection(rconn: Redis, user: User, name: str, traits: tuple) -> dict:
    """Create a new collection."""
    now = datetime.utcnow()
    return add_to_user_collections(rconn, user, {
        "id": uuid4(),
        "name": name,
        "created": now,
        "changed": now,
        "num_members": len(traits),
        "members": traits
    })

def get_collection(rconn: Redis, user: User, collection_id: UUID) -> dict:
    """Retrieve the collection with ID `collection_id`."""
    colls = tuple(coll for coll in user_collections(rconn, user)
                  if coll["id"] == collection_id)
    if len(colls) == 0:
        raise NotFoundError(
            f"Could not find a collection with ID `{collection_id}` for user "
            f"with ID `{user.user_id}`")
    if len(colls) > 1:
        err = InvalidData(
            "More than one collection was found having the ID "
            f"`{collection_id}` for user with ID `{user.user_id}`.")
        err.error_code = 513
        raise err
    return colls[0]

def delete_collections(rconn: Redis,
                       user: User,
                       collection_ids: tuple[UUID, ...]) -> tuple[dict, ...]:
    """
    Delete collections with the given `collection_ids` returning the deleted
    collections.
    """
    ucolls = user_collections(rconn, user)
    save_collections(
        rconn,
        user,
        tuple(coll for coll in ucolls if coll["id"] not in collection_ids))
    return tuple(coll for coll in ucolls if coll["id"] in collection_ids)