diff options
Diffstat (limited to 'wqflask/utility/authentication_tools.py')
-rw-r--r-- | wqflask/utility/authentication_tools.py | 150 |
1 files changed, 0 insertions, 150 deletions
diff --git a/wqflask/utility/authentication_tools.py b/wqflask/utility/authentication_tools.py deleted file mode 100644 index 5d00d600..00000000 --- a/wqflask/utility/authentication_tools.py +++ /dev/null @@ -1,150 +0,0 @@ -import json -import requests - -from flask import g -from wqflask.database import database_connection -from base import webqtlConfig - -from utility.redis_tools import (get_redis_conn, - get_resource_info, - get_resource_id, - add_resource) -from utility.tools import get_setting, GN_PROXY_URL - -Redis = get_redis_conn() - -def check_resource_availability(dataset, user_id, trait_id=None): - # At least for now assume temporary entered traits are accessible - if type(dataset) == str or dataset.type == "Temp": - return webqtlConfig.DEFAULT_PRIVILEGES - - resource_id = get_resource_id(dataset, trait_id) - - # ZS: This should never be false, but it's technically possible if - # a non-Temp dataset somehow had a type other than - # Publish/ProbeSet/Geno - if resource_id: - resource_info = get_resource_info(resource_id) - - # If resource isn't already in redis, add it with default - # privileges - if not resource_info: - resource_info = add_new_resource(dataset, trait_id) - - # Check if super-user - we should probably come up with some - # way to integrate this into the proxy - if user_id in Redis.smembers("super_users"): - return webqtlConfig.SUPER_PRIVILEGES - - response = None - the_url = f"{GN_PROXY_URL}available?resource={resource_id}&user={user_id}" - try: - response = json.loads(requests.get(the_url).content) - except: - response = resource_info['default_mask'] - - return response - - -def add_new_resource(dataset, trait_id=None): - resource_ob = { - 'owner_id': "none", # webqtlConfig.DEFAULT_OWNER_ID, - 'default_mask': webqtlConfig.DEFAULT_PRIVILEGES, - 'group_masks': {} - } - - if dataset.type == "Publish": - group_code = get_group_code(dataset) - if group_code is None: - group_code = "" - resource_ob['name'] = group_code + "_" + str(trait_id) - resource_ob['data'] = { - 'dataset': dataset.id, - 'trait': trait_id - } - resource_ob['type'] = 'dataset-publish' - elif dataset.type == "Geno": - resource_ob['name'] = dataset.name - resource_ob['data'] = { - 'dataset': dataset.id - } - resource_ob['type'] = 'dataset-geno' - else: - resource_ob['name'] = dataset.name - resource_ob['data'] = { - 'dataset': dataset.id - } - resource_ob['type'] = 'dataset-probeset' - - resource_info = add_resource(resource_ob, update=False) - - return resource_info - - -def get_group_code(dataset): - with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: - cursor.execute( - "SELECT InbredSetCode FROM InbredSet WHERE Name=%s", - (dataset.group.name,) - ) - if results := cursor.fetchone(): - return results[0] - return "" - - -def check_admin(resource_id=None): - the_url = GN_PROXY_URL + "available?resource={}&user={}".format( - resource_id, g.user_session.user_id) - try: - response = json.loads(requests.get(the_url).content)['admin'] - except: - resource_info = get_resource_info(resource_id) - response = resource_info['default_mask']['admin'] - - if type(response) is list: - if 'edit-admins' in response: - return 'edit_admins' - elif 'edit-access' in response: - return 'edit-access' - - return response - - -def check_owner(dataset=None, trait_id=None, resource_id=None): - if resource_id: - resource_info = get_resource_info(resource_id) - if g.user_session.user_id == resource_info['owner_id']: - return resource_id - else: - resource_id = get_resource_id(dataset, trait_id) - if resource_id: - resource_info = get_resource_info(resource_id) - if g.user_session.user_id == resource_info['owner_id']: - return resource_id - - return False - - -def check_owner_or_admin(dataset=None, trait_id=None, resource_id=None): - if not resource_id: - if dataset.type == "Temp": - return "not-admin" - else: - resource_id = get_resource_id(dataset, trait_id) - - try: - user_id = g.user_session.user_id.encode('utf-8') - except: - user_id = g.user_session.user_id - - if user_id in Redis.smembers("super_users"): - return "owner" - - resource_info = get_resource_info(resource_id) - if resource_info: - if user_id == resource_info['owner_id']: - return "owner" - else: - return check_admin(resource_id) - - return "not-admin" |