"""Methods for interacting with gn-proxy."""
import functools
import json
import uuid
import datetime
from urllib.parse import urljoin
from enum import Enum, unique
from typing import Dict, List, Optional, Union
from redis import Redis
import requests
@functools.total_ordering
class OrderedEnum(Enum):
"""A class that ordered Enums in order of position"""
@classmethod
@functools.lru_cache(None)
def _member_list(cls):
return list(cls)
def __lt__(self, other):
if self.__class__ is other.__class__:
member_list = self.__class__._member_list()
return member_list.index(self) < member_list.index(other)
return NotImplemented
@unique
class DataRole(OrderedEnum):
"""Enums for Data Access"""
NO_ACCESS = "no-access"
VIEW = "view"
EDIT = "edit"
@unique
class AdminRole(OrderedEnum):
"""Enums for Admin status"""
NOT_ADMIN = "not-admin"
EDIT_ACCESS = "edit-access"
EDIT_ADMINS = "edit-admins"
def get_user_membership(conn: Redis, user_id: str,
group_id: str) -> Dict:
"""Return a dictionary that indicates whether the `user_id` is a
member or admin of `group_id`.
Args:
- conn: a Redis Connection with the responses decoded.
- user_id: a user's unique id
e.g. '8ad942fe-490d-453e-bd37-56f252e41603'
- group_id: a group's unique id
e.g. '7fa95d07-0e2d-4bc5-b47c-448fdc1260b2'
Returns:
A dict indicating whether the user is an admin or a member of
the group: {"member": True, "admin": False}
"""
results = {"member": False, "admin": False}
for key, value in conn.hgetall('groups').items():
if key == group_id:
group_info = json.loads(value)
if user_id in group_info.get("admins"):
results["admin"] = True
if user_id in group_info.get("members"):
results["member"] = True
break
return results
def get_highest_user_access_role(
resource_id: str,
user_id: str,
gn_proxy_url: str = "http://localhost:8080") -> Dict:
"""Get the highest access roles for a given user
Args:
- resource_id: The unique id of a given resource.
- user_id: The unique id of a given user.
- gn_proxy_url: The URL where gn-proxy is running.
Returns:
A dict indicating the highest access role the user has.
"""
role_mapping: Dict[str, Union[DataRole, AdminRole]] = {}
for data_role, admin_role in zip(DataRole, AdminRole):
role_mapping.update({data_role.value: data_role, })
role_mapping.update({admin_role.value: admin_role, })
access_role = {}
response = requests.get(urljoin(gn_proxy_url,
("available?resource="
f"{resource_id}&user={user_id}")))
for key, value in json.loads(response.content).items():
access_role[key] = max(map(lambda role: role_mapping[role], value))
return access_role
def get_groups_by_user_uid(user_uid: str, conn: Redis) -> Dict:
"""Given a user uid, get the groups in which they are a member or admin of.
Args:
- user_uid: A user's unique id
- conn: A redis connection
Returns:
- A dictionary containing the list of groups the user is part of e.g.:
{"admin": [], "member": ["ce0dddd1-6c50-4587-9eec-6c687a54ad86"]}
"""
admin = []
member = []
for group_uuid, group_info in conn.hgetall("groups").items():
group_info = json.loads(group_info)
group_info["uuid"] = group_uuid
if user_uid in group_info.get('admins'):
admin.append(group_info)
if user_uid in group_info.get('members'):
member.append(group_info)
return {
"admin": admin,
"member": member,
}
def get_user_info_by_key(key: str, value: str,
conn: Redis) -> Optional[Dict]:
"""Given a key, get a user's information if value is matched"""
if key != "user_id":
for user_uuid, user_info in conn.hgetall("users").items():
user_info = json.loads(user_info)
if (key in user_info and user_info.get(key) == value):
user_info["user_id"] = user_uuid
return user_info
elif key == "user_id":
if user_info := conn.hget("users", value):
user_info = json.loads(user_info)
user_info["user_id"] = value
return user_info
return None
def create_group(conn: Redis, group_name: Optional[str],
admin_user_uids: List = None,
member_user_uids: List = None) -> Optional[Dict]:
"""Create a group given the group name, members and admins of that group."""
if admin_user_uids is None:
admin_user_uids = []
if member_user_uids is None:
member_user_uids = []
if group_name and bool(admin_user_uids + member_user_uids):
timestamp = datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p')
group = {
"id": (group_id := str(uuid.uuid4())),
"admins": admin_user_uids,
"members": member_user_uids,
"name": group_name,
"created_timestamp": timestamp,
"changed_timestamp": timestamp,
}
conn.hset("groups", group_id, json.dumps(group))
return group
return None