1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
import json
import redis
import requests
from flask import Blueprint
from flask import current_app
from flask import flash
from flask import g
from flask import redirect
from flask import render_template
from flask import request
from flask import url_for
from gn3.authentication import AdminRole
from gn3.authentication import DataRole
from gn3.authentication import get_user_membership
from gn3.authentication import get_highest_user_access_role
from typing import Dict, Tuple
from urllib.parse import urljoin
from gn2.wqflask.decorators import edit_access_required
from gn2.wqflask.decorators import edit_admins_access_required
from gn2.wqflask.decorators import login_required
resource_management = Blueprint('resource_management', __name__)
def add_extra_resource_metadata(conn: redis.Redis,
resource_id: str,
resource: Dict) -> Dict:
"""If resource['owner_id'] exists, add metadata about that user. Also,
if the resource contains group masks, add the group name into the
resource dict. Note that resource['owner_id'] and the group masks are
unique identifiers so they aren't human readable names.
Args:
- conn: A redis connection with the responses decoded.
- resource_id: The unique identifier of the resource.
- resource: A dict containing details(metadata) about a
given resource.
Returns:
An embellished dictionary with its resource id; the human
readable names of the group masks; and the owner id if it was set.
"""
resource["resource_id"] = resource_id
# Embellish the resource information with owner details if the
# owner is set
if (owner_id := resource.get("owner_id", "none").lower()) == "none":
resource["owner_id"] = None
resource["owner_details"] = None
else:
user_details = json.loads(conn.hget("users", owner_id))
resource["owner_details"] = {
"email_address": user_details.get("email_address"),
"full_name": user_details.get("full_name"),
"organization": user_details.get("organization"),
}
# Embellish the resources information with the group name if the
# group masks are present
if groups := resource.get('group_masks', {}):
for group_id in groups.keys():
resource['group_masks'][group_id]["group_name"] = (
json.loads(conn.hget("groups", group_id)).get('name'))
return resource
@resource_management.route("/resources/<resource_id>")
@login_required()
def view_resource(resource_id: str):
user_id = (g.user_session.record.get(b"user_id",
b"").decode("utf-8") or
g.user_session.record.get("user_id", ""))
redis_conn = redis.from_url(
current_app.config["REDIS_URL"],
decode_responses=True)
# Abort early if the resource can't be found
if not (resource := redis_conn.hget("resources", resource_id)):
return f"Resource: {resource_id} Not Found!", 401
return render_template(
"admin/manage_resource.html",
resource_info=(add_extra_resource_metadata(
conn=redis_conn,
resource_id=resource_id,
resource=json.loads(resource))),
access_role=get_highest_user_access_role(
resource_id=resource_id,
user_id=user_id,
gn_proxy_url=current_app.config.get("GN2_PROXY")))
@resource_management.route("/resources/<resource_id>/make-public",
methods=('POST',))
@edit_access_required
@login_required()
def update_resource_publicity(resource_id: str):
redis_conn = redis.from_url(
current_app.config["REDIS_URL"],
decode_responses=True)
resource_info = json.loads(redis_conn.hget("resources", resource_id))
if (is_open_to_public := request
.form
.to_dict()
.get("open_to_public")) == "True":
resource_info['default_mask'] = {
'data': DataRole.VIEW.value,
'admin': AdminRole.NOT_ADMIN.value,
'metadata': DataRole.VIEW.value,
}
elif is_open_to_public == "False":
resource_info['default_mask'] = {
'data': DataRole.NO_ACCESS.value,
'admin': AdminRole.NOT_ADMIN.value,
'metadata': DataRole.NO_ACCESS.value,
}
redis_conn.hset("resources", resource_id, json.dumps(resource_info))
return redirect(url_for("resource_management.view_resource",
resource_id=resource_id))
@resource_management.route("/resources/<resource_id>/change-owner")
@edit_admins_access_required
@login_required()
def view_resource_owner(resource_id: str):
return render_template(
"admin/change_resource_owner.html",
resource_id=resource_id)
@resource_management.route("/resources/<resource_id>/change-owner",
methods=('POST',))
@edit_admins_access_required
@login_required()
def change_owner(resource_id: str):
if user_id := request.form.get("new_owner"):
redis_conn = redis.from_url(
current_app.config["REDIS_URL"],
decode_responses=True)
resource = json.loads(redis_conn.hget("resources", resource_id))
resource["owner_id"] = user_id
redis_conn.hset("resources", resource_id, json.dumps(resource))
flash("The resource's owner has been changed.", "alert-info")
return redirect(url_for("resource_management.view_resource",
resource_id=resource_id))
@resource_management.route("<resource_id>/users/search", methods=('POST',))
@edit_admins_access_required
@login_required()
def search_user(resource_id: str):
results = {}
for user in (users := redis.from_url(
current_app.config["REDIS_URL"],
decode_responses=True).hgetall("users")):
user = json.loads(users[user])
for q in (request.form.get("user_name"),
request.form.get("user_email")):
if q and (q in user.get("email_address", "") or
q in user.get("full_name", "")):
results[user.get("user_id", "")] = user
return json.dumps(tuple(results.values()))
|