From 204a308be0f741726b9a620d88fbc22b22124c81 Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Fri, 29 Dec 2023 18:55:37 +0000 Subject: Namespace all modules under gn2. We move all modules under a gn2 directory. This is important for "correct" packaging and deployment as a Guix service. --- gn2/utility/authentication_tools.py | 150 ++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 gn2/utility/authentication_tools.py (limited to 'gn2/utility/authentication_tools.py') diff --git a/gn2/utility/authentication_tools.py b/gn2/utility/authentication_tools.py new file mode 100644 index 00000000..935f629e --- /dev/null +++ b/gn2/utility/authentication_tools.py @@ -0,0 +1,150 @@ +import json +import requests + +from flask import g +from gn2.wqflask.database import database_connection +from gn2.base import webqtlConfig + +from gn2.utility.redis_tools import (get_redis_conn, + get_resource_info, + get_resource_id, + add_resource) +from gn2.utility.tools import get_setting, GN_PROXY_URL + +Redis = get_redis_conn() + +def check_resource_availability(dataset, user_id, trait_id=None): + # At least for now assume temporary entered traits are accessible + if type(dataset) == str or dataset.type == "Temp": + return webqtlConfig.DEFAULT_PRIVILEGES + + resource_id = get_resource_id(dataset, trait_id) + + # ZS: This should never be false, but it's technically possible if + # a non-Temp dataset somehow had a type other than + # Publish/ProbeSet/Geno + if resource_id: + resource_info = get_resource_info(resource_id) + + # If resource isn't already in redis, add it with default + # privileges + if not resource_info: + resource_info = add_new_resource(dataset, trait_id) + + # Check if super-user - we should probably come up with some + # way to integrate this into the proxy + if user_id in Redis.smembers("super_users"): + return webqtlConfig.SUPER_PRIVILEGES + + response = None + the_url = f"{GN_PROXY_URL}available?resource={resource_id}&user={user_id}" + try: + response = json.loads(requests.get(the_url).content) + except: + response = resource_info['default_mask'] + + return response + + +def add_new_resource(dataset, trait_id=None): + resource_ob = { + 'owner_id': "none", # webqtlConfig.DEFAULT_OWNER_ID, + 'default_mask': webqtlConfig.DEFAULT_PRIVILEGES, + 'group_masks': {} + } + + if dataset.type == "Publish": + group_code = get_group_code(dataset) + if group_code is None: + group_code = "" + resource_ob['name'] = group_code + "_" + str(trait_id) + resource_ob['data'] = { + 'dataset': dataset.id, + 'trait': trait_id + } + resource_ob['type'] = 'dataset-publish' + elif dataset.type == "Geno": + resource_ob['name'] = dataset.name + resource_ob['data'] = { + 'dataset': dataset.id + } + resource_ob['type'] = 'dataset-geno' + else: + resource_ob['name'] = dataset.name + resource_ob['data'] = { + 'dataset': dataset.id + } + resource_ob['type'] = 'dataset-probeset' + + resource_info = add_resource(resource_ob, update=False) + + return resource_info + + +def get_group_code(dataset): + with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: + cursor.execute( + "SELECT InbredSetCode FROM InbredSet WHERE Name=%s", + (dataset.group.name,) + ) + if results := cursor.fetchone(): + return results[0] + return "" + + +def check_admin(resource_id=None): + the_url = GN_PROXY_URL + "available?resource={}&user={}".format( + resource_id, g.user_session.user_id) + try: + response = json.loads(requests.get(the_url).content)['admin'] + except: + resource_info = get_resource_info(resource_id) + response = resource_info['default_mask']['admin'] + + if type(response) is list: + if 'edit-admins' in response: + return 'edit_admins' + elif 'edit-access' in response: + return 'edit-access' + + return response + + +def check_owner(dataset=None, trait_id=None, resource_id=None): + if resource_id: + resource_info = get_resource_info(resource_id) + if g.user_session.user_id == resource_info['owner_id']: + return resource_id + else: + resource_id = get_resource_id(dataset, trait_id) + if resource_id: + resource_info = get_resource_info(resource_id) + if g.user_session.user_id == resource_info['owner_id']: + return resource_id + + return False + + +def check_owner_or_admin(dataset=None, trait_id=None, resource_id=None): + if not resource_id: + if dataset.type == "Temp": + return "not-admin" + else: + resource_id = get_resource_id(dataset, trait_id) + + try: + user_id = g.user_session.user_id.encode('utf-8') + except: + user_id = g.user_session.user_id + + if user_id in Redis.smembers("super_users"): + return "owner" + + resource_info = get_resource_info(resource_id) + if resource_info: + if user_id == resource_info['owner_id']: + return "owner" + else: + return check_admin(resource_id) + + return "not-admin" -- cgit v1.2.3