aboutsummaryrefslogtreecommitdiff
path: root/wqflask/utility/authentication_tools.py
blob: 5d00d600987838d1b453db55ba00ec50a05af577 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import json
import requests

from flask import g
from wqflask.database import database_connection
from base import webqtlConfig

from utility.redis_tools import (get_redis_conn,
                                 get_resource_info,
                                 get_resource_id,
                                 add_resource)
from utility.tools import get_setting, GN_PROXY_URL

Redis = get_redis_conn()

def check_resource_availability(dataset, user_id, trait_id=None):
    # At least for now assume temporary entered traits are accessible
    if type(dataset) == str or dataset.type == "Temp":
        return webqtlConfig.DEFAULT_PRIVILEGES

    resource_id = get_resource_id(dataset, trait_id)

    # ZS: This should never be false, but it's technically possible if
    # a non-Temp dataset somehow had a type other than
    # Publish/ProbeSet/Geno
    if resource_id:
        resource_info = get_resource_info(resource_id)

        # If resource isn't already in redis, add it with default
        # privileges
        if not resource_info:
            resource_info = add_new_resource(dataset, trait_id)

    # Check if super-user - we should probably come up with some
    # way to integrate this into the proxy
    if user_id in Redis.smembers("super_users"):
        return webqtlConfig.SUPER_PRIVILEGES

    response = None
    the_url = f"{GN_PROXY_URL}available?resource={resource_id}&user={user_id}"
    try:
        response = json.loads(requests.get(the_url).content)
    except:
        response = resource_info['default_mask']

    return response


def add_new_resource(dataset, trait_id=None):
    resource_ob = {
        'owner_id': "none",  # webqtlConfig.DEFAULT_OWNER_ID,
        'default_mask': webqtlConfig.DEFAULT_PRIVILEGES,
        'group_masks': {}
    }

    if dataset.type == "Publish":
        group_code = get_group_code(dataset)
        if group_code is None:
            group_code = ""
        resource_ob['name'] = group_code + "_" + str(trait_id)
        resource_ob['data'] = {
            'dataset': dataset.id,
            'trait': trait_id
        }
        resource_ob['type'] = 'dataset-publish'
    elif dataset.type == "Geno":
        resource_ob['name'] = dataset.name
        resource_ob['data'] = {
            'dataset': dataset.id
        }
        resource_ob['type'] = 'dataset-geno'
    else:
        resource_ob['name'] = dataset.name
        resource_ob['data'] = {
            'dataset': dataset.id
        }
        resource_ob['type'] = 'dataset-probeset'

    resource_info = add_resource(resource_ob, update=False)

    return resource_info


def get_group_code(dataset):
    with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor:
        cursor.execute(
            "SELECT InbredSetCode FROM InbredSet WHERE Name=%s",
            (dataset.group.name,)
        )
        if results := cursor.fetchone():
            return results[0]
        return ""


def check_admin(resource_id=None):
    the_url = GN_PROXY_URL + "available?resource={}&user={}".format(
        resource_id, g.user_session.user_id)
    try:
        response = json.loads(requests.get(the_url).content)['admin']
    except:
        resource_info = get_resource_info(resource_id)
        response = resource_info['default_mask']['admin']

    if type(response) is list:
        if 'edit-admins' in response:
            return 'edit_admins'
        elif 'edit-access' in response:
            return 'edit-access'

    return response


def check_owner(dataset=None, trait_id=None, resource_id=None):
    if resource_id:
        resource_info = get_resource_info(resource_id)
        if g.user_session.user_id == resource_info['owner_id']:
            return resource_id
    else:
        resource_id = get_resource_id(dataset, trait_id)
        if resource_id:
            resource_info = get_resource_info(resource_id)
            if g.user_session.user_id == resource_info['owner_id']:
                return resource_id

    return False


def check_owner_or_admin(dataset=None, trait_id=None, resource_id=None):
    if not resource_id:
        if dataset.type == "Temp":
            return "not-admin"
        else:
            resource_id = get_resource_id(dataset, trait_id)

    try:
        user_id = g.user_session.user_id.encode('utf-8')
    except:
        user_id = g.user_session.user_id

    if user_id in Redis.smembers("super_users"):
        return "owner"

    resource_info = get_resource_info(resource_id)
    if resource_info:
        if user_id == resource_info['owner_id']:
            return "owner"
        else:
            return check_admin(resource_id)

    return "not-admin"