From 21bf6af02e33865f00319467ba4670fa3d872961 Mon Sep 17 00:00:00 2001 From: BonfaceKilz Date: Fri, 29 Oct 2021 08:33:37 +0300 Subject: Add auth module --- gn3/authentication.py | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 gn3/authentication.py (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py new file mode 100644 index 0000000..baf2c7a --- /dev/null +++ b/gn3/authentication.py @@ -0,0 +1,94 @@ +import functools +import json +import redis +import requests + +from typing import Dict +from enum import Enum, unique +from urllib.parse import urljoin + + +@functools.total_ordering +class OrderedEnum(Enum): + @classmethod + @functools.lru_cache(None) + def _member_list(cls): + return list(cls) + + def __lt__(self, other): + if self.__class__ is other.__class__: + member_list = self.__class__._member_list() + return member_list.index(self) < member_list.index(other) + return NotImplemented + + +@unique +class DataRole(OrderedEnum): + NO_ACCESS = "no-access" + VIEW = "view" + EDIT = "edit" + + +@unique +class AdminRole(OrderedEnum): + NOT_ADMIN = "not-admin" + EDIT_ACCESS = "edit-access" + EDIT_ADMINS = "edit-admins" + + +def get_user_membership(conn: redis.Redis, user_id: str, + group_id: str) -> Dict: + """Return a dictionary that indicates whether the `user_id` is a + member or admin of `group_id`. + + Args: + - conn: a Redis Connection with the responses decoded. + - user_id: a user's unique id + e.g. '8ad942fe-490d-453e-bd37-56f252e41603' + - group_id: a group's unique id + e.g. '7fa95d07-0e2d-4bc5-b47c-448fdc1260b2' + + Returns: + A dict indicating whether the user is an admin or a member of + the group: {"member": True, "admin": False} + + """ + results = {"member": False, "admin": False} + for key, value in conn.hgetall('groups').items(): + if key == group_id: + group_info = json.loads(value) + if user_id in group_info.get("admins"): + results["admin"] = True + if user_id in group_info.get("members"): + results["member"] = True + break + return results + + +def get_highest_user_access_role( + resource_id: str, + user_id: str, + gn_proxy_url: str = "http://localhost:8080") -> Dict: + """Get the highest access roles for a given user + + Args: + - resource_id: The unique id of a given resource. + - user_id: The unique id of a given user. + - gn_proxy_url: The URL where gn-proxy is running. + + Returns: + A dict indicating the highest access role the user has. + + """ + role_mapping = {} + for x, y in zip(DataRole, AdminRole): + role_mapping.update({x.value: x, }) + role_mapping.update({y.value: y, }) + access_role = {} + for key, value in json.loads( + requests.get(urljoin( + gn_proxy_url, + ("available?resource=" + f"{resource_id}&user={user_id}"))).content).items(): + access_role[key] = max(map(lambda x: role_mapping[x], value)) + return access_role -- cgit v1.2.3 From a74df27eae7481cc8f1f12cc8a0adcc06cd1a984 Mon Sep 17 00:00:00 2001 From: BonfaceKilz Date: Sun, 31 Oct 2021 15:58:18 +0300 Subject: Fix pylint issues in gn3.authentication --- gn3/authentication.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py index baf2c7a..892aa8f 100644 --- a/gn3/authentication.py +++ b/gn3/authentication.py @@ -1,15 +1,17 @@ +"""Methods for interacting with gn-proxy.""" import functools import json +from urllib.parse import urljoin +from enum import Enum, unique +from typing import Dict + import redis import requests -from typing import Dict -from enum import Enum, unique -from urllib.parse import urljoin - @functools.total_ordering class OrderedEnum(Enum): + """A class that ordered Enums in order of position""" @classmethod @functools.lru_cache(None) def _member_list(cls): @@ -24,6 +26,7 @@ class OrderedEnum(Enum): @unique class DataRole(OrderedEnum): + """Enums for Data Access""" NO_ACCESS = "no-access" VIEW = "view" EDIT = "edit" @@ -31,6 +34,7 @@ class DataRole(OrderedEnum): @unique class AdminRole(OrderedEnum): + """Enums for Admin status""" NOT_ADMIN = "not-admin" EDIT_ACCESS = "edit-access" EDIT_ADMINS = "edit-admins" @@ -81,14 +85,13 @@ def get_highest_user_access_role( """ role_mapping = {} - for x, y in zip(DataRole, AdminRole): - role_mapping.update({x.value: x, }) - role_mapping.update({y.value: y, }) + for data_role, admin_role in zip(DataRole, AdminRole): + role_mapping.update({data_role.value: data_role, }) + role_mapping.update({admin_role.value: admin_role, }) access_role = {} - for key, value in json.loads( - requests.get(urljoin( - gn_proxy_url, - ("available?resource=" - f"{resource_id}&user={user_id}"))).content).items(): - access_role[key] = max(map(lambda x: role_mapping[x], value)) + response = requests.get(urljoin(gn_proxy_url, + ("available?resource=" + f"{resource_id}&user={user_id}"))) + for key, value in json.loads(response.content).items(): + access_role[key] = max(map(lambda role: role_mapping[role], value)) return access_role -- cgit v1.2.3 From 8169d8aacd8598730fd2e6eba06052e7502f2cc1 Mon Sep 17 00:00:00 2001 From: BonfaceKilz Date: Mon, 1 Nov 2021 09:05:32 +0300 Subject: Fix mypy issues --- gn3/authentication.py | 8 ++++---- mypy.ini | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py index 892aa8f..7bc7b77 100644 --- a/gn3/authentication.py +++ b/gn3/authentication.py @@ -3,9 +3,9 @@ import functools import json from urllib.parse import urljoin from enum import Enum, unique -from typing import Dict +from typing import Dict, Union -import redis +from redis import Redis import requests @@ -40,7 +40,7 @@ class AdminRole(OrderedEnum): EDIT_ADMINS = "edit-admins" -def get_user_membership(conn: redis.Redis, user_id: str, +def get_user_membership(conn: Redis, user_id: str, group_id: str) -> Dict: """Return a dictionary that indicates whether the `user_id` is a member or admin of `group_id`. @@ -84,7 +84,7 @@ def get_highest_user_access_role( A dict indicating the highest access role the user has. """ - role_mapping = {} + role_mapping: Dict[str, Union[DataRole, AdminRole]] = {} for data_role, admin_role in zip(DataRole, AdminRole): role_mapping.update({data_role.value: data_role, }) role_mapping.update({admin_role.value: admin_role, }) diff --git a/mypy.ini b/mypy.ini index a507703..b0c48df 100644 --- a/mypy.ini +++ b/mypy.ini @@ -13,4 +13,10 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pingouin.*] +ignore_missing_imports = True + +[mypy-redis.*] +ignore_missing_imports = True + +[mypy-requests.*] ignore_missing_imports = True \ No newline at end of file -- cgit v1.2.3 From d28b823527b1eec1a99344da51abd139f97bfb64 Mon Sep 17 00:00:00 2001 From: BonfaceKilz Date: Tue, 9 Nov 2021 11:05:56 +0300 Subject: Add functions for updating groups --- gn3/authentication.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py index 7bc7b77..6719631 100644 --- a/gn3/authentication.py +++ b/gn3/authentication.py @@ -1,9 +1,12 @@ """Methods for interacting with gn-proxy.""" import functools import json +import uuid +import datetime + from urllib.parse import urljoin from enum import Enum, unique -from typing import Dict, Union +from typing import Dict, List, Optional, Union from redis import Redis import requests @@ -95,3 +98,65 @@ def get_highest_user_access_role( for key, value in json.loads(response.content).items(): access_role[key] = max(map(lambda role: role_mapping[role], value)) return access_role + + +def get_groups_by_user_uid(user_uid: str, conn: Redis) -> Dict: + """Given a user uid, get the groups in which they are a member or admin of. + + Args: + - user_uid: A user's unique id + - conn: A redis connection + + Returns: + - A dictionary containing the list of groups the user is part of e.g.: + {"admin": [], "member": ["ce0dddd1-6c50-4587-9eec-6c687a54ad86"]} + """ + admin = [] + member = [] + for uuid, group_info in conn.hgetall("groups").items(): + group_info = json.loads(group_info) + group_info["uuid"] = uuid + if user_uid in group_info.get('admins'): + admin.append(group_info) + if user_uid in group_info.get('members'): + member.append(group_info) + return { + "admin": admin, + "member": member, + } + + +def get_user_info_by_key(key: str, value: str, + conn: Redis) -> Optional[Dict]: + """Given a key, get a user's information if value is matched""" + if key != "user_id": + for uuid, user_info in conn.hgetall("users").items(): + user_info = json.loads(user_info) + if (key in user_info and + user_info.get(key) == value): + user_info["user_id"] = uuid + return user_info + elif key == "user_id": + if user_info := conn.hget("users", value): + user_info = json.loads(user_info) + user_info["user_id"] = value + return user_info + return None + + +def create_group(conn: Redis, group_name: Optional[str], + admin_user_uids: List = [], + member_user_uids: List = []) -> Optional[Dict]: + """Create a group given the group name, members and admins of that group.""" + if group_name and bool(admin_user_uids + member_user_uids): + timestamp = datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p') + group = { + "id": (group_id := str(uuid.uuid4())), + "admins": admin_user_uids, + "members": member_user_uids, + "name": group_name, + "created_timestamp": timestamp, + "changed_timestamp": timestamp, + } + conn.hset("groups", group_id, json.dumps(group)) + return group -- cgit v1.2.3 From 0059de6c028996c9b21a833f186ba7df4899fa2d Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Wed, 10 Nov 2021 15:47:20 +0530 Subject: Do not use dangerous default argument []. Default arguments get evaluated only once when the function is defined, and are then shared across all instances of the function. If the argument is then mutated, this can cause hard to find bugs. See https://docs.python.org/3/tutorial/controlflow.html#default-argument-values * gn3/authentication.py (create_group): Do not use [] as the default argument. --- gn3/authentication.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py index 6719631..1ccffcc 100644 --- a/gn3/authentication.py +++ b/gn3/authentication.py @@ -145,9 +145,13 @@ def get_user_info_by_key(key: str, value: str, def create_group(conn: Redis, group_name: Optional[str], - admin_user_uids: List = [], - member_user_uids: List = []) -> Optional[Dict]: + admin_user_uids: List = None, + member_user_uids: List = None) -> Optional[Dict]: """Create a group given the group name, members and admins of that group.""" + if admin_user_uids is None: + admin_user_uids = [] + if member_user_uids is None: + member_user_uids = [] if group_name and bool(admin_user_uids + member_user_uids): timestamp = datetime.datetime.utcnow().strftime('%b %d %Y %I:%M%p') group = { -- cgit v1.2.3 From 83f0e5c2955595de195a6de4a9aa7feeb94df9bb Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Wed, 10 Nov 2021 15:55:45 +0530 Subject: Reformat condition on a single line. * gn3/authentication.py (get_user_info_by_key): Reformat so that condition is on a single line. --- gn3/authentication.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py index 1ccffcc..7384df0 100644 --- a/gn3/authentication.py +++ b/gn3/authentication.py @@ -132,8 +132,7 @@ def get_user_info_by_key(key: str, value: str, if key != "user_id": for uuid, user_info in conn.hgetall("users").items(): user_info = json.loads(user_info) - if (key in user_info and - user_info.get(key) == value): + if (key in user_info and user_info.get(key) == value): user_info["user_id"] = uuid return user_info elif key == "user_id": -- cgit v1.2.3 From f566d9eb1d4e12d7e7a6ef319c39760f5726578e Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Wed, 10 Nov 2021 15:56:59 +0530 Subject: Do not shadow global symbol uuid. * gn3/authentication.py (get_groups_by_user_uid): Rename local symbol uuid to group_uuid. (get_user_info_by_key): Rename local symbol uuid to user_uuid. --- gn3/authentication.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'gn3/authentication.py') diff --git a/gn3/authentication.py b/gn3/authentication.py index 7384df0..a6372c1 100644 --- a/gn3/authentication.py +++ b/gn3/authentication.py @@ -113,9 +113,9 @@ def get_groups_by_user_uid(user_uid: str, conn: Redis) -> Dict: """ admin = [] member = [] - for uuid, group_info in conn.hgetall("groups").items(): + for group_uuid, group_info in conn.hgetall("groups").items(): group_info = json.loads(group_info) - group_info["uuid"] = uuid + group_info["uuid"] = group_uuid if user_uid in group_info.get('admins'): admin.append(group_info) if user_uid in group_info.get('members'): @@ -130,10 +130,10 @@ def get_user_info_by_key(key: str, value: str, conn: Redis) -> Optional[Dict]: """Given a key, get a user's information if value is matched""" if key != "user_id": - for uuid, user_info in conn.hgetall("users").items(): + for user_uuid, user_info in conn.hgetall("users").items(): user_info = json.loads(user_info) if (key in user_info and user_info.get(key) == value): - user_info["user_id"] = uuid + user_info["user_id"] = user_uuid return user_info elif key == "user_id": if user_info := conn.hget("users", value): -- cgit v1.2.3