1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
"""Methods for interacting with gn-proxy."""
import functools
import json
import uuid
import datetime
from urllib.parse import urljoin
from enum import Enum, unique
from typing import Dict, List, Optional, Union
from redis import Redis
import requests
@functools.total_ordering
class OrderedEnum(Enum):
"""A class that ordered Enums in order of position"""
@classmethod
@functools.lru_cache(None)
def _member_list(cls):
return list(cls)
def __lt__(self, other):
if self.__class__ is other.__class__:
member_list = self.__class__._member_list()
return member_list.index(self) < member_list.index(other)
return NotImplemented
@unique
class DataRole(OrderedEnum):
"""Enums for Data Access"""
NO_ACCESS = "no-access"
VIEW = "view"
EDIT = "edit"
@unique
class AdminRole(OrderedEnum):
"""Enums for Admin status"""
NOT_ADMIN = "not-admin"
EDIT_ACCESS = "edit-access"
EDIT_ADMINS = "edit-admins"
def get_user_membership(conn: Redis, user_id: str,
group_id: str) -> Dict:
"""Return a dictionary that indicates whether the `user_id` is a
member or admin of `group_id`.
Args:
- conn: a Redis Connection with the responses decoded.
- user_id: a user's unique id
e.g. '8ad942fe-490d-453e-bd37-56f252e41603'
- group_id: a group's unique id
e.g. '7fa95d07-0e2d-4bc5-b47c-448fdc1260b2'
Returns:
A dict indicating whether the user is an admin or a member of
the group: {"member": True, "admin": False}
"""
results = {"member": False, "admin": False}
for key, value in conn.hgetall('groups').items():
if key == group_id:
group_info = json.loads(value)
if user_id in group_info.get("admins"):
results["admin"] = True
if user_id in group_info.get("members"):
results["member"] = True
break
return results
def get_highest_user_access_role(
resource_id: str,
user_id: str,
gn_proxy_url: str = "http://localhost:8080") -> Dict:
"""Get the highest access roles for a given user
Args:
- resource_id: The unique id of a given resource.
- user_id: The unique id of a given user.
- gn_proxy_url: The URL where gn-proxy is running.
Returns:
A dict indicating the highest access role the user has.
"""
role_mapping: Dict[str, Union[DataRole, AdminRole]] = {}
for data_role, admin_role in zip(DataRole, AdminRole):
role_mapping.update({data_role.value: data_role, })
role_mapping.update({admin_role.value: admin_role, })
access_role = {}
response = requests.get(urljoin(gn_proxy_url,
("available?resource="
f"{resource_id}&user={user_id}")))
for key, value in json.loads(response.content).items():
access_role[key] = max(map(lambda role: role_mapping[role], value))
return access_role
def get_groups_by_user_uid(user_uid: str, conn: Redis) -> Dict:
"""Given a user uid, get the groups in which they are a member or admin of.
Args:
- user_uid: A user's unique id
- conn: A redis connection
Returns:
- A dictionary containing the list of groups the user is part of e.g.:
{"admin": [], "member": ["ce0dddd1-6c50-4587-9eec-6c687a54ad86"]}
"""
admin = []
member = []
for uuid, group_info in conn.hgetall("groups").items():
group_info = json.loads(group_info)
group_info["uuid"] = uuid
if user_uid in group_info.get('admins'):
admin.append(group_info)
if user_uid in group_info.get('members'):
member.append(group_info)
return {
"admin": admin,
"member": member,
}
def get_user_info_by_key(key: str, value: str,
conn: Redis) -> Optional[Dict]:
"""Given a key, get a user's information if value is matched"""
if key != "user_id":
for uuid, user_info in conn.hgetall("users").items():
user_info = json.loads(user_info)
if (key in user_info and
user_info.get(key) == value):
user_info["user_id"] = uuid
return user_info
elif key == "user_id":
if user_info := conn.hget("users", value):
user_info = json.loads(user_info)
user_info["user_id"] = value
return user_info
return None
def create_group(conn: Redis, group_name: Optional[str],
admin_user_uids: List = None,
member_user_uids: List = None) -> Optional[Dict]:
"""Create a group given the group name, members and admins of that group."""
if admin_user_uids is None:
admin_user_uids = []
if member_user_uids is None:
member_user_uids = []
if group_name and bool(admin_user_uids + member_user_uids):
timestamp = datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p')
group = {
"id": (group_id := str(uuid.uuid4())),
"admins": admin_user_uids,
"members": member_user_uids,
"name": group_name,
"created_timestamp": timestamp,
"changed_timestamp": timestamp,
}
conn.hset("groups", group_id, json.dumps(group))
return group
|