import json import redis from flask import Blueprint from flask import current_app from flask import flash from flask import g from flask import redirect from flask import render_template from flask import request from flask import url_for from gn3.authentication import AdminRole from gn3.authentication import DataRole from gn3.authentication import get_highest_user_access_role from typing import Dict from gn2.wqflask.decorators import edit_admins_access_required from gn2.wqflask.decorators import login_required resource_management = Blueprint('resource_management', __name__) def add_extra_resource_metadata(conn: redis.Redis, resource_id: str, resource: Dict) -> Dict: """If resource['owner_id'] exists, add metadata about that user. Also, if the resource contains group masks, add the group name into the resource dict. Note that resource['owner_id'] and the group masks are unique identifiers so they aren't human readable names. Args: - conn: A redis connection with the responses decoded. - resource_id: The unique identifier of the resource. - resource: A dict containing details(metadata) about a given resource. Returns: An embellished dictionary with its resource id; the human readable names of the group masks; and the owner id if it was set. """ resource["resource_id"] = resource_id # Embellish the resource information with owner details if the # owner is set if (owner_id := resource.get("owner_id", "none").lower()) == "none": resource["owner_id"] = None resource["owner_details"] = None else: user_details = json.loads(conn.hget("users", owner_id)) resource["owner_details"] = { "email_address": user_details.get("email_address"), "full_name": user_details.get("full_name"), "organization": user_details.get("organization"), } # Embellish the resources information with the group name if the # group masks are present if groups := resource.get('group_masks', {}): for group_id in groups.keys(): resource['group_masks'][group_id]["group_name"] = ( json.loads(conn.hget("groups", group_id)).get('name')) return resource @resource_management.route("/resources/<resource_id>") @login_required() def view_resource(resource_id: str): user_id = (g.user_session.record.get(b"user_id", b"").decode("utf-8") or g.user_session.record.get("user_id", "")) redis_conn = redis.from_url( current_app.config["REDIS_URL"], decode_responses=True) # Abort early if the resource can't be found if not (resource := redis_conn.hget("resources", resource_id)): return f"Resource: {resource_id} Not Found!", 401 return render_template( "admin/manage_resource.html", resource_info=(add_extra_resource_metadata( conn=redis_conn, resource_id=resource_id, resource=json.loads(resource))), access_role=get_highest_user_access_role( resource_id=resource_id, user_id=user_id, gn_proxy_url=current_app.config.get("GN2_PROXY"))) @resource_management.route("/resources/<resource_id>/make-public", methods=('POST',)) @login_required() def update_resource_publicity(resource_id: str): redis_conn = redis.from_url( current_app.config["REDIS_URL"], decode_responses=True) resource_info = json.loads(redis_conn.hget("resources", resource_id)) if (is_open_to_public := request .form .to_dict() .get("open_to_public")) == "True": resource_info['default_mask'] = { 'data': DataRole.VIEW.value, 'admin': AdminRole.NOT_ADMIN.value, 'metadata': DataRole.VIEW.value, } elif is_open_to_public == "False": resource_info['default_mask'] = { 'data': DataRole.NO_ACCESS.value, 'admin': AdminRole.NOT_ADMIN.value, 'metadata': DataRole.NO_ACCESS.value, } redis_conn.hset("resources", resource_id, json.dumps(resource_info)) return redirect(url_for("resource_management.view_resource", resource_id=resource_id)) @resource_management.route("/resources/<resource_id>/change-owner") @edit_admins_access_required @login_required() def view_resource_owner(resource_id: str): return render_template( "admin/change_resource_owner.html", resource_id=resource_id) @resource_management.route("/resources/<resource_id>/change-owner", methods=('POST',)) @edit_admins_access_required @login_required() def change_owner(resource_id: str): if user_id := request.form.get("new_owner"): redis_conn = redis.from_url( current_app.config["REDIS_URL"], decode_responses=True) resource = json.loads(redis_conn.hget("resources", resource_id)) resource["owner_id"] = user_id redis_conn.hset("resources", resource_id, json.dumps(resource)) flash("The resource's owner has been changed.", "alert-info") return redirect(url_for("resource_management.view_resource", resource_id=resource_id)) @resource_management.route("<resource_id>/users/search", methods=('POST',)) @edit_admins_access_required @login_required() def search_user(resource_id: str): results = {} for user in (users := redis.from_url( current_app.config["REDIS_URL"], decode_responses=True).hgetall("users")): user = json.loads(users[user]) for q in (request.form.get("user_name"), request.form.get("user_email")): if q and (q in user.get("email_address", "") or q in user.get("full_name", "")): results[user.get("user_id", "")] = user return json.dumps(tuple(results.values()))