aboutsummaryrefslogtreecommitdiff
"""Methods for interacting with gn-proxy."""
import functools
import json
import uuid
import datetime

from urllib.parse import urljoin
from enum import Enum, unique
from typing import Dict, List, Optional, Union

from redis import Redis
import requests


@functools.total_ordering
class OrderedEnum(Enum):
    """A class that ordered Enums in order of position"""
    @classmethod
    @functools.lru_cache(None)
    def _member_list(cls):
        return list(cls)

    def __lt__(self, other):
        if self.__class__ is other.__class__:
            member_list = self.__class__._member_list()
            return member_list.index(self) < member_list.index(other)
        return NotImplemented


@unique
class DataRole(OrderedEnum):
    """Enums for Data Access"""
    NO_ACCESS = "no-access"
    VIEW = "view"
    EDIT = "edit"


@unique
class AdminRole(OrderedEnum):
    """Enums for Admin status"""
    NOT_ADMIN = "not-admin"
    EDIT_ACCESS = "edit-access"
    EDIT_ADMINS = "edit-admins"


def get_user_membership(conn: Redis, user_id: str,
                        group_id: str) -> Dict:
    """Return a dictionary that indicates whether the `user_id` is a
    member or admin of `group_id`.

    Args:
      - conn: a Redis Connection with the responses decoded.
      - user_id: a user's unique id
        e.g. '8ad942fe-490d-453e-bd37-56f252e41603'
      - group_id: a group's unique id
      e.g. '7fa95d07-0e2d-4bc5-b47c-448fdc1260b2'

    Returns:
      A dict indicating whether the user is an admin or a member of
      the group: {"member": True, "admin": False}

    """
    results = {"member": False, "admin": False}
    for key, value in conn.hgetall('groups').items():
        if key == group_id:
            group_info = json.loads(value)
            if user_id in group_info.get("admins"):
                results["admin"] = True
            if user_id in group_info.get("members"):
                results["member"] = True
            break
    return results


def get_highest_user_access_role(
        resource_id: str,
        user_id: str,
        gn_proxy_url: str = "http://localhost:8080") -> Dict:
    """Get the highest access roles for a given user

    Args:
      - resource_id: The unique id of a given resource.
      - user_id: The unique id of a given user.
      - gn_proxy_url: The URL where gn-proxy is running.

    Returns:
      A dict indicating the highest access role the user has.

    """
    role_mapping: Dict[str, Union[DataRole, AdminRole]] = {}
    for data_role, admin_role in zip(DataRole, AdminRole):
        role_mapping.update({data_role.value: data_role, })
        role_mapping.update({admin_role.value: admin_role, })
    access_role = {}
    response = requests.get(urljoin(gn_proxy_url,
                                    ("available?resource="
                                     f"{resource_id}&user={user_id}")))
    for key, value in json.loads(response.content).items():
        access_role[key] = max(map(lambda role: role_mapping[role], value))
    return access_role


def get_groups_by_user_uid(user_uid: str, conn: Redis) -> Dict:
    """Given a user uid, get the groups in which they are a member or admin of.

    Args:
      - user_uid: A user's unique id
      - conn: A redis connection

    Returns:
      - A dictionary containing the list of groups the user is part of e.g.:
        {"admin": [], "member": ["ce0dddd1-6c50-4587-9eec-6c687a54ad86"]}
    """
    admin = []
    member = []
    for group_uuid, group_info in conn.hgetall("groups").items():
        group_info = json.loads(group_info)
        group_info["uuid"] = group_uuid
        if user_uid in group_info.get('admins'):
            admin.append(group_info)
        if user_uid in group_info.get('members'):
            member.append(group_info)
    return {
        "admin": admin,
        "member": member,
    }


def get_user_info_by_key(key: str, value: str,
                         conn: Redis) -> Optional[Dict]:
    """Given a key, get a user's information if value is matched"""
    if key != "user_id":
        for user_uuid, user_info in conn.hgetall("users").items():
            user_info = json.loads(user_info)
            if (key in user_info and user_info.get(key) == value):
                user_info["user_id"] = user_uuid
                return user_info
    elif key == "user_id":
        if user_info := conn.hget("users", value):
            user_info = json.loads(user_info)
            user_info["user_id"] = value
            return user_info
    return None


def create_group(conn: Redis, group_name: Optional[str],
                 admin_user_uids: List = None,
                 member_user_uids: List = None) -> Optional[Dict]:
    """Create a group given the group name, members and admins of that group."""
    if admin_user_uids is None:
        admin_user_uids = []
    if member_user_uids is None:
        member_user_uids = []
    if group_name and bool(admin_user_uids + member_user_uids):
        timestamp = datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p')
        group = {
            "id": (group_id := str(uuid.uuid4())),
            "admins": admin_user_uids,
            "members": member_user_uids,
            "name": group_name,
            "created_timestamp": timestamp,
            "changed_timestamp": timestamp,
        }
        conn.hset("groups", group_id, json.dumps(group))
        return group
    return None