1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
"""Handle user collections."""
import json
from uuid import UUID, uuid4
from datetime import datetime
from redis import Redis
from email_validator import validate_email, EmailNotValidError
from gn3.auth.authorisation.errors import InvalidData, NotFoundError
from ..models import User
__OLD_REDIS_COLLECTIONS_KEY__ = "collections"
__REDIS_COLLECTIONS_KEY__ = "collections2"
class CollectionJSONEncoder(json.JSONEncoder):
"""Serialise collection objects into JSON."""
def default(self, obj):# pylint: disable=[arguments-renamed]
if isinstance(obj, UUID):
return str(obj)
if isinstance(obj, datetime):
return obj.strftime("%b %d %Y %I:%M%p")
return json.JSONEncoder.default(self, obj)
def __valid_email__(email:str) -> bool:
"""Check for email validity."""
try:
validate_email(email, check_deliverability=True)
except EmailNotValidError as _enve:
return False
return True
def __toggle_boolean_field__(
rconn: Redis, email: str, field: str):
"""Toggle the valuen of a boolean field"""
mig_dict = json.loads(rconn.hget("migratable-accounts", email) or "{}")
if bool(mig_dict):
rconn.hset("migratable-accounts", email,
json.dumps({**mig_dict, field: not mig_dict.get(field, True)}))
def __build_email_uuid_bridge__(rconn: Redis):
"""
Build a connection between new accounts and old user accounts.
The only thing that is common between the two is the email address,
therefore, we use that to link the two items.
"""
old_accounts = {
account["email_address"]: {
"user_id": account["user_id"],
"collections-migrated": False,
"resources_migrated": False
} for account in (
acct for acct in
(json.loads(usr) for usr in rconn.hgetall("users").values())
if (bool(acct.get("email_address", False)) and
__valid_email__(acct["email_address"])))
}
if bool(old_accounts):
rconn.hset("migratable-accounts", mapping={
key: json.dumps(value) for key,value in old_accounts.items()
})
return old_accounts
def __retrieve_old_accounts__(rconn: Redis) -> dict:
accounts = rconn.hgetall("migratable-accounts")
if accounts:
return {
key: json.loads(value) for key, value in accounts.items()
}
return __build_email_uuid_bridge__(rconn)
def parse_collection(coll: dict) -> dict:
"""Parse the collection as persisted in redis to a usable python object."""
created = coll.get("created", coll.get("created_timestamp"))
changed = coll.get("changed", coll.get("changed_timestamp"))
return {
"id": UUID(coll["id"]),
"name": coll["name"],
"created": datetime.strptime(created, "%b %d %Y %I:%M%p"),
"changed": datetime.strptime(changed, "%b %d %Y %I:%M%p"),
"num_members": int(coll["num_members"]),
"members": coll["members"]
}
def dump_collection(pythoncollection: dict) -> str:
"""Convert the collection from a python object to a json string."""
return json.dumps(pythoncollection, cls=CollectionJSONEncoder)
def __retrieve_old_user_collections__(rconn: Redis, old_user_id: UUID) -> tuple:
"""Retrieve any old collections relating to the user."""
return tuple(parse_collection(coll) for coll in
json.loads(rconn.hget(
__OLD_REDIS_COLLECTIONS_KEY__, str(old_user_id)) or "[]"))
def user_collections(rconn: Redis, user: User) -> tuple[dict, ...]:
"""Retrieve current user collections."""
collections = tuple(parse_collection(coll) for coll in json.loads(
rconn.hget(__REDIS_COLLECTIONS_KEY__, str(user.user_id)) or
"[]"))
old_accounts = __retrieve_old_accounts__(rconn)
if (user.email in old_accounts and
not old_accounts[user.email]["collections-migrated"]):
old_user_id = old_accounts[user.email]["user_id"]
collections = tuple({
coll["id"]: coll for coll in (
collections + __retrieve_old_user_collections__(
rconn, UUID(old_user_id)))
}.values())
__toggle_boolean_field__(rconn, user.email, "collections-migrated")
rconn.hset(
__REDIS_COLLECTIONS_KEY__,
key=str(user.user_id),
value=json.dumps(collections, cls=CollectionJSONEncoder))
return collections
def save_collections(rconn: Redis, user: User, collections: tuple[dict, ...]) -> tuple[dict, ...]:
"""Save the `collections` to redis."""
rconn.hset(
__REDIS_COLLECTIONS_KEY__,
str(user.user_id),
json.dumps(collections, cls=CollectionJSONEncoder))
return collections
def add_to_user_collections(rconn: Redis, user: User, collection: dict) -> dict:
"""Add `collection` to list of user collections."""
ucolls = user_collections(rconn, user)
save_collections(rconn, user, ucolls + (collection,))
return collection
def create_collection(rconn: Redis, user: User, name: str, traits: tuple) -> dict:
"""Create a new collection."""
now = datetime.utcnow()
return add_to_user_collections(rconn, user, {
"id": uuid4(),
"name": name,
"created": now,
"changed": now,
"num_members": len(traits),
"members": traits
})
def get_collection(rconn: Redis, user: User, collection_id: UUID) -> dict:
"""Retrieve the collection with ID `collection_id`."""
colls = tuple(coll for coll in user_collections(rconn, user)
if coll["id"] == collection_id)
if len(colls) == 0:
raise NotFoundError(
f"Could not find a collection with ID `{collection_id}` for user "
f"with ID `{user.user_id}`")
if len(colls) > 1:
err = InvalidData(
"More than one collection was found having the ID "
f"`{collection_id}` for user with ID `{user.user_id}`.")
err.error_code = 513
raise err
return colls[0]
def __raise_if_collections_empty__(user: User, collections: tuple[dict, ...]):
"""Raise an exception if no collections are found for `user`."""
if len(collections) < 1:
raise NotFoundError(f"No collections found for user `{user.user_id}`")
def __raise_if_not_single_collection__(
user: User, collection_id: UUID, collections: tuple[dict, ...]):
"""
Raise an exception there is zero, or more than one collection for `user`.
"""
if len(collections) == 0:
raise NotFoundError(f"No collections found for user `{user.user_id}` "
f"with ID `{collection_id}`.")
if len(collections) > 1:
err = InvalidData(
"More than one collection was found having the ID "
f"`{collection_id}` for user with ID `{user.user_id}`.")
err.error_code = 513
raise err
def delete_collections(rconn: Redis,
user: User,
collection_ids: tuple[UUID, ...]) -> tuple[dict, ...]:
"""
Delete collections with the given `collection_ids` returning the deleted
collections.
"""
ucolls = user_collections(rconn, user)
save_collections(
rconn,
user,
tuple(coll for coll in ucolls if coll["id"] not in collection_ids))
return tuple(coll for coll in ucolls if coll["id"] in collection_ids)
def add_traits(rconn: Redis,
user: User,
collection_id: UUID,
traits: tuple[str, ...]) -> dict:
"""
Add `traits` to the `user` collection identified by `collection_id`.
Returns: The collection with the new traits added.
"""
ucolls = user_collections(rconn, user)
__raise_if_collections_empty__(user, ucolls)
mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id)
__raise_if_not_single_collection__(user, collection_id, mod_col)
new_members = tuple(set(tuple(mod_col[0]["members"]) + traits))
new_coll = {
**mod_col[0],
"members": new_members,
"num_members": len(new_members)
}
save_collections(
rconn,
user,
(tuple(coll for coll in ucolls if coll["id"] != collection_id) +
(new_coll,)))
return new_coll
def remove_traits(rconn: Redis,
user: User,
collection_id: UUID,
traits: tuple[str, ...]) -> dict:
"""
Remove `traits` from the `user` collection identified by `collection_id`.
Returns: The collection with the specified `traits` removed.
"""
ucolls = user_collections(rconn, user)
__raise_if_collections_empty__(user, ucolls)
mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id)
__raise_if_not_single_collection__(user, collection_id, mod_col)
new_members = tuple(
trait for trait in mod_col[0]["members"] if trait not in traits)
new_coll = {
**mod_col[0],
"members": new_members,
"num_members": len(new_members)
}
save_collections(
rconn,
user,
(tuple(coll for coll in ucolls if coll["id"] != collection_id) +
(new_coll,)))
return new_coll
def change_name(rconn: Redis,
user: User,
collection_id: UUID,
new_name: str) -> dict:
"""
Change the collection's name.
Returns: The collection with the new name.
"""
ucolls = user_collections(rconn, user)
__raise_if_collections_empty__(user, ucolls)
mod_col = tuple(coll for coll in ucolls if coll["id"] == collection_id)
__raise_if_not_single_collection__(user, collection_id, mod_col)
new_coll = {**mod_col[0], "name": new_name}
save_collections(
rconn,
user,
(tuple(coll for coll in ucolls if coll["id"] != collection_id) +
(new_coll,)))
return new_coll
|