about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--wqflask/tests/unit/wqflask/test_resource_manager.py59
-rw-r--r--wqflask/wqflask/resource_manager.py48
2 files changed, 107 insertions, 0 deletions
diff --git a/wqflask/tests/unit/wqflask/test_resource_manager.py b/wqflask/tests/unit/wqflask/test_resource_manager.py
index a27f40e1..b0b7e6a3 100644
--- a/wqflask/tests/unit/wqflask/test_resource_manager.py
+++ b/wqflask/tests/unit/wqflask/test_resource_manager.py
@@ -3,6 +3,9 @@ import unittest
 
 from unittest import mock
 from wqflask.resource_manager import get_user_membership
+from wqflask.resource_manager import get_user_access_roles
+from wqflask.resource_manager import DataRole
+from wqflask.resource_manager import AdminRole
 
 
 class TestGetUserMembership(unittest.TestCase):
@@ -49,3 +52,59 @@ class TestGetUserMembership(unittest.TestCase):
                 group_id="7fa95d07-0e2d-4bc5-b47c-448fdc1260b2"),
             {"member": True,
              "admin": True})
+
+
+class TestCheckUserAccessRole(unittest.TestCase):
+    """Test cases for `get_user_access_roles`"""
+
+    def setUp(self):
+        conn = mock.MagicMock()
+        conn.hget.return_value = (
+            '{"owner_id": "8ad942fe-490d-453e-bd37", '
+            '"default_mask": {"data": "no-access", '
+            '"metadata": "no-access", '
+            '"admin": "not-admin"}, '
+            '"group_masks": '
+            '{"7fa95d07-0e2d-4bc5-b47c-448fdc1260b2": '
+            '{"metadata": "edit", "data": "edit"}}, '
+            '"name": "_14329", "'
+            'data": {"dataset": 1, "trait": 14329}, '
+            '"type": "dataset-publish"}')
+
+        conn.hgetall.return_value = {
+            '7fa95d07-0e2d-4bc5-b47c-448fdc1260b2': (
+                '{"name": "editors", '
+                '"admins": ["8ad942fe-490d-453e-bd37-56f252e41604", "rand"], '
+                '"members": ["8ad942fe-490d-453e-bd37-56f252e41603", '
+                '"rand"], '
+                '"changed_timestamp": "Oct 06 2021 06:39PM", '
+                '"created_timestamp": "Oct 06 2021 06:39PM"}')}
+        self.conn = conn
+
+    def test_get_user_access_when_owner(self):
+        """Test that the right access roles are set"""
+        self.assertEqual(get_user_access_roles(
+            conn=self.conn,
+            resource_id="",  # Can be anything
+            user_id="8ad942fe-490d-453e-bd37"),
+                         {"data": DataRole.EDIT,
+                          "metadata": DataRole.EDIT,
+                          "admin": AdminRole.EDIT_ACCESS})
+
+    def test_get_user_access_default_mask(self):
+        self.assertEqual(get_user_access_roles(
+            conn=self.conn,
+            resource_id="",  # Can be anything
+            user_id=""),
+                         {"data": DataRole.NO_ACCESS,
+                          "metadata": DataRole.NO_ACCESS,
+                          "admin": AdminRole.NOT_ADMIN})
+
+    def test_get_user_access_group_mask(self):
+        self.assertEqual(get_user_access_roles(
+            conn=self.conn,
+            resource_id="",  # Can be anything
+            user_id="8ad942fe-490d-453e-bd37-56f252e41603"),
+                         {"data": DataRole.EDIT,
+                          "metadata": DataRole.EDIT,
+                          "admin": AdminRole.NOT_ADMIN})
diff --git a/wqflask/wqflask/resource_manager.py b/wqflask/wqflask/resource_manager.py
index 7f4718f0..a3a94f9e 100644
--- a/wqflask/wqflask/resource_manager.py
+++ b/wqflask/wqflask/resource_manager.py
@@ -63,4 +63,52 @@ def get_user_membership(conn: redis.Redis, user_id: str,
                 results["member"] = True
             break
     return results
+
+
+def get_user_access_roles(conn: redis.Redis,
+                          resource_id: str,
+                          user_id: str) -> Dict:
+    """Get the highest access roles for a given user
+
+    Args:
+      - conn: A redis connection with `decoded_responses == True`.
+      - resource_id: The unique id of a given resource.
+
+    Returns:
+      A dict indicating the highest access role the user has.
+    """
+    # This is the default access role
+    access_role = {
+        "data": [DataRole.NO_ACCESS],
+        "metadata": [DataRole.NO_ACCESS],
+        "admin": [AdminRole.NOT_ADMIN],
+    }
+    resource_info = json.loads(conn.hget('resources', resource_id))
+
+    # Check the resource's default mask
+    if default_mask := resource_info.get("default_mask"):
+        access_role["data"].append(DataRole(default_mask.get("data")))
+        access_role["metadata"].append(DataRole(default_mask.get("metadata")))
+        access_role["admin"].append(AdminRole(default_mask.get("admin")))
+
+    # Then check if the user is the owner Check with Zach and Rob if
+    # the owner, be default should, as the lowest access_roles, edit
+    # access
+    if resource_info.get("owner_id") == user_id:
+        access_role["data"].append(DataRole.EDIT)
+        access_role["metadata"].append(DataRole.EDIT)
+        access_role["admin"].append(AdminRole.EDIT_ACCESS)
+
+    # Check the group mask. If the user is in that group mask, use the
+    # access roles for that group
+    if group_masks := resource_info.get("group_masks"):
+        for group_id, roles in group_masks.items():
+            user_membership = get_user_membership(conn=conn,
+                                                  user_id=user_id,
+                                                  group_id=group_id)
+            if any(user_membership.values()):
+                access_role["data"].append(DataRole(roles.get("data")))
+                access_role["metadata"].append(
+                    DataRole(roles.get("metadata")))
+    return {k: max(v) for k, v in access_role.items()}
     admin_status = check_owner_or_admin(resource_id=resource_id)