aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--wqflask/tests/unit/wqflask/test_resource_manager.py59
-rw-r--r--wqflask/wqflask/resource_manager.py48
2 files changed, 107 insertions, 0 deletions
diff --git a/wqflask/tests/unit/wqflask/test_resource_manager.py b/wqflask/tests/unit/wqflask/test_resource_manager.py
index a27f40e1..b0b7e6a3 100644
--- a/wqflask/tests/unit/wqflask/test_resource_manager.py
+++ b/wqflask/tests/unit/wqflask/test_resource_manager.py
@@ -3,6 +3,9 @@ import unittest
from unittest import mock
from wqflask.resource_manager import get_user_membership
+from wqflask.resource_manager import get_user_access_roles
+from wqflask.resource_manager import DataRole
+from wqflask.resource_manager import AdminRole
class TestGetUserMembership(unittest.TestCase):
@@ -49,3 +52,59 @@ class TestGetUserMembership(unittest.TestCase):
group_id="7fa95d07-0e2d-4bc5-b47c-448fdc1260b2"),
{"member": True,
"admin": True})
+
+
+class TestCheckUserAccessRole(unittest.TestCase):
+ """Test cases for `get_user_access_roles`"""
+
+ def setUp(self):
+ conn = mock.MagicMock()
+ conn.hget.return_value = (
+ '{"owner_id": "8ad942fe-490d-453e-bd37", '
+ '"default_mask": {"data": "no-access", '
+ '"metadata": "no-access", '
+ '"admin": "not-admin"}, '
+ '"group_masks": '
+ '{"7fa95d07-0e2d-4bc5-b47c-448fdc1260b2": '
+ '{"metadata": "edit", "data": "edit"}}, '
+ '"name": "_14329", "'
+ 'data": {"dataset": 1, "trait": 14329}, '
+ '"type": "dataset-publish"}')
+
+ conn.hgetall.return_value = {
+ '7fa95d07-0e2d-4bc5-b47c-448fdc1260b2': (
+ '{"name": "editors", '
+ '"admins": ["8ad942fe-490d-453e-bd37-56f252e41604", "rand"], '
+ '"members": ["8ad942fe-490d-453e-bd37-56f252e41603", '
+ '"rand"], '
+ '"changed_timestamp": "Oct 06 2021 06:39PM", '
+ '"created_timestamp": "Oct 06 2021 06:39PM"}')}
+ self.conn = conn
+
+ def test_get_user_access_when_owner(self):
+ """Test that the right access roles are set"""
+ self.assertEqual(get_user_access_roles(
+ conn=self.conn,
+ resource_id="", # Can be anything
+ user_id="8ad942fe-490d-453e-bd37"),
+ {"data": DataRole.EDIT,
+ "metadata": DataRole.EDIT,
+ "admin": AdminRole.EDIT_ACCESS})
+
+ def test_get_user_access_default_mask(self):
+ self.assertEqual(get_user_access_roles(
+ conn=self.conn,
+ resource_id="", # Can be anything
+ user_id=""),
+ {"data": DataRole.NO_ACCESS,
+ "metadata": DataRole.NO_ACCESS,
+ "admin": AdminRole.NOT_ADMIN})
+
+ def test_get_user_access_group_mask(self):
+ self.assertEqual(get_user_access_roles(
+ conn=self.conn,
+ resource_id="", # Can be anything
+ user_id="8ad942fe-490d-453e-bd37-56f252e41603"),
+ {"data": DataRole.EDIT,
+ "metadata": DataRole.EDIT,
+ "admin": AdminRole.NOT_ADMIN})
diff --git a/wqflask/wqflask/resource_manager.py b/wqflask/wqflask/resource_manager.py
index 7f4718f0..a3a94f9e 100644
--- a/wqflask/wqflask/resource_manager.py
+++ b/wqflask/wqflask/resource_manager.py
@@ -63,4 +63,52 @@ def get_user_membership(conn: redis.Redis, user_id: str,
results["member"] = True
break
return results
+
+
+def get_user_access_roles(conn: redis.Redis,
+ resource_id: str,
+ user_id: str) -> Dict:
+ """Get the highest access roles for a given user
+
+ Args:
+ - conn: A redis connection with `decoded_responses == True`.
+ - resource_id: The unique id of a given resource.
+
+ Returns:
+ A dict indicating the highest access role the user has.
+ """
+ # This is the default access role
+ access_role = {
+ "data": [DataRole.NO_ACCESS],
+ "metadata": [DataRole.NO_ACCESS],
+ "admin": [AdminRole.NOT_ADMIN],
+ }
+ resource_info = json.loads(conn.hget('resources', resource_id))
+
+ # Check the resource's default mask
+ if default_mask := resource_info.get("default_mask"):
+ access_role["data"].append(DataRole(default_mask.get("data")))
+ access_role["metadata"].append(DataRole(default_mask.get("metadata")))
+ access_role["admin"].append(AdminRole(default_mask.get("admin")))
+
+ # Then check if the user is the owner Check with Zach and Rob if
+ # the owner, be default should, as the lowest access_roles, edit
+ # access
+ if resource_info.get("owner_id") == user_id:
+ access_role["data"].append(DataRole.EDIT)
+ access_role["metadata"].append(DataRole.EDIT)
+ access_role["admin"].append(AdminRole.EDIT_ACCESS)
+
+ # Check the group mask. If the user is in that group mask, use the
+ # access roles for that group
+ if group_masks := resource_info.get("group_masks"):
+ for group_id, roles in group_masks.items():
+ user_membership = get_user_membership(conn=conn,
+ user_id=user_id,
+ group_id=group_id)
+ if any(user_membership.values()):
+ access_role["data"].append(DataRole(roles.get("data")))
+ access_role["metadata"].append(
+ DataRole(roles.get("metadata")))
+ return {k: max(v) for k, v in access_role.items()}
admin_status = check_owner_or_admin(resource_id=resource_id)