aboutsummaryrefslogtreecommitdiff
path: root/gn2/wqflask/resource_manager.py
blob: 55b9cbb32fa91ebac114f388bdfd477dd1d2e773 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import json
import redis

from flask import Blueprint
from flask import current_app
from flask import flash
from flask import g
from flask import redirect
from flask import render_template
from flask import request
from flask import url_for

from gn3.authentication import AdminRole
from gn3.authentication import DataRole
from gn3.authentication import get_highest_user_access_role

from typing import Dict

from gn2.wqflask.decorators import edit_admins_access_required
from gn2.wqflask.decorators import login_required


resource_management = Blueprint('resource_management', __name__)


def add_extra_resource_metadata(conn: redis.Redis,
                                resource_id: str,
                                resource: Dict) -> Dict:
    """If resource['owner_id'] exists, add metadata about that user. Also,
if the resource contains group masks, add the group name into the
resource dict. Note that resource['owner_id'] and the group masks are
unique identifiers so they aren't human readable names.

    Args:
      - conn: A redis connection with the responses decoded.
      - resource_id: The unique identifier of the resource.
      - resource: A dict containing details(metadata) about a
        given resource.

    Returns:
      An embellished dictionary with its resource id; the human
    readable names of the group masks; and the owner id if it was set.

    """
    resource["resource_id"] = resource_id

    # Embellish the resource information with owner details if the
    # owner is set
    if (owner_id := resource.get("owner_id", "none").lower()) == "none":
        resource["owner_id"] = None
        resource["owner_details"] = None
    else:
        user_details = json.loads(conn.hget("users", owner_id))
        resource["owner_details"] = {
            "email_address": user_details.get("email_address"),
            "full_name": user_details.get("full_name"),
            "organization": user_details.get("organization"),
        }

    # Embellish the resources information with the group name if the
    # group masks are present
    if groups := resource.get('group_masks', {}):
        for group_id in groups.keys():
            resource['group_masks'][group_id]["group_name"] = (
                json.loads(conn.hget("groups", group_id)).get('name'))
    return resource


@resource_management.route("/resources/<resource_id>")
@login_required()
def view_resource(resource_id: str):
    user_id = (g.user_session.record.get(b"user_id",
                                         b"").decode("utf-8") or
               g.user_session.record.get("user_id", ""))
    redis_conn = redis.from_url(
        current_app.config["REDIS_URL"],
        decode_responses=True)
    # Abort early if the resource can't be found
    if not (resource := redis_conn.hget("resources", resource_id)):
        return f"Resource: {resource_id} Not Found!", 401

    return render_template(
        "admin/manage_resource.html",
        resource_info=(add_extra_resource_metadata(
            conn=redis_conn,
            resource_id=resource_id,
            resource=json.loads(resource))),
        access_role=get_highest_user_access_role(
            resource_id=resource_id,
            user_id=user_id,
            gn_proxy_url=current_app.config.get("GN2_PROXY")))


@resource_management.route("/resources/<resource_id>/make-public",
                           methods=('POST',))
@login_required()
def update_resource_publicity(resource_id: str):
    redis_conn = redis.from_url(
        current_app.config["REDIS_URL"],
        decode_responses=True)
    resource_info = json.loads(redis_conn.hget("resources", resource_id))

    if (is_open_to_public := request
        .form
        .to_dict()
            .get("open_to_public")) == "True":
        resource_info['default_mask'] = {
            'data': DataRole.VIEW.value,
            'admin': AdminRole.NOT_ADMIN.value,
            'metadata': DataRole.VIEW.value,
        }
    elif is_open_to_public == "False":
        resource_info['default_mask'] = {
            'data': DataRole.NO_ACCESS.value,
            'admin': AdminRole.NOT_ADMIN.value,
            'metadata': DataRole.NO_ACCESS.value,
        }
    redis_conn.hset("resources", resource_id, json.dumps(resource_info))
    return redirect(url_for("resource_management.view_resource",
                            resource_id=resource_id))


@resource_management.route("/resources/<resource_id>/change-owner")
@edit_admins_access_required
@login_required()
def view_resource_owner(resource_id: str):
    return render_template(
        "admin/change_resource_owner.html",
        resource_id=resource_id)


@resource_management.route("/resources/<resource_id>/change-owner",
                           methods=('POST',))
@edit_admins_access_required
@login_required()
def change_owner(resource_id: str):
    if user_id := request.form.get("new_owner"):
        redis_conn = redis.from_url(
            current_app.config["REDIS_URL"],
            decode_responses=True)
        resource = json.loads(redis_conn.hget("resources", resource_id))
        resource["owner_id"] = user_id
        redis_conn.hset("resources", resource_id, json.dumps(resource))
        flash("The resource's owner has been changed.", "alert-info")
    return redirect(url_for("resource_management.view_resource",
                            resource_id=resource_id))


@resource_management.route("<resource_id>/users/search", methods=('POST',))
@edit_admins_access_required
@login_required()
def search_user(resource_id: str):
    results = {}
    for user in (users := redis.from_url(
            current_app.config["REDIS_URL"],
            decode_responses=True).hgetall("users")):
        user = json.loads(users[user])
        for q in (request.form.get("user_name"),
                  request.form.get("user_email")):
            if q and (q in user.get("email_address", "") or
                      q in user.get("full_name", "")):
                results[user.get("user_id", "")] = user
    return json.dumps(tuple(results.values()))