"""Handle data endpoints.""" import sys import uuid import json import logging from typing import Any from functools import reduce, partial import redis from MySQLdb.cursors import DictCursor from authlib.integrations.flask_oauth2.errors import _HTTPException from flask import request, jsonify, Response, Blueprint, current_app as app from gn_libs import mysqldb as gn3db from gn_libs import sqlite3 as db from gn_auth import jobs from gn_auth.commands import run_async_cmd from gn_auth.auth.requests import request_json from gn_auth.auth.errors import InvalidData, NotFoundError from gn_auth.auth.authorisation.resources.groups.models import group_by_id from gn_auth.auth.db.sqlite3 import with_db_connection # Replace this with gn_libs alternative from ..checks import require_json from ...authentication.users import User from ...authentication.oauth2.resource_server import require_oauth from .mrna import ( link_mrna_data, ungrouped_mrna_data, resources_by_datasets_and_traits as mrna_resources_by_datasets_and_traits) from .genotypes import ( link_genotype_data, ungrouped_genotype_data, resources_by_datasets_and_traits as geno_resources_by_datasets_and_traits) from .phenotypes import ( phenosbp, link_phenotype_data, pheno_traits_from_db, resources_by_datasets_and_traits as pheno_resources_by_datasets_and_traits) logger = logging.getLogger(__name__) data = Blueprint("data", __name__) data.register_blueprint(phenosbp, url_prefix="/phenotypes") @data.route("species") def list_species() -> Response: """List all available species information.""" with (gn3db.database_connection(app.config["SQL_URI"]) as gn3conn, gn3conn.cursor(DictCursor) as cursor): cursor.execute("SELECT * FROM Species") return jsonify(tuple(dict(row) for row in cursor.fetchall())) @data.route("/authorisation", methods=["POST"]) @require_json def authorisation() -> Response: """Retrieve the authorisation level for datasets/traits for the user.""" # Access endpoint with something like: # curl -X POST http://127.0.0.1:8081/auth/data/authorisation \ # -H "Content-Type: application/json" \ # -d '{"traits": ["HC_M2_0606_P::1442370_at", "BXDGeno::01.001.695", # "BXDPublish::10001"]}' def __organise_traits__(acc, curr): dset, _trt = curr key = "ProbeSet" if dset.endswith("Publish"): key = "Publish" elif dset.endswith("Geno"): key="Geno" elif dset.endswith("Temp"): key = "Temp" else: key = "ProbeSet" return { **acc, key: acc.get(key, tuple()) + (curr,) } _dset_traits: dict[str, tuple[tuple[str, str], ...]] = reduce( __organise_traits__, ( (dset.strip(), trt.strip()) for dset, trt in (trtstr.split("::") for trtstr in request_json().get("traits", []))), {key: tuple() for key in ("Publish", "ProbeSet", "Geno", "Temp")}) db_uri = app.config["AUTH_DB"] user = User(uuid.uuid4(), "anon@ymous.user", "Anonymous User") with (db.connection(db_uri) as authconn, db.cursor(authconn) as cursor): _all_resources = { _rrow["resource_id"]: _rrow for _rtypes in ( pheno_resources_by_datasets_and_traits( authconn, _dset_traits["Publish"]), geno_resources_by_datasets_and_traits( authconn, _dset_traits["Geno"]), mrna_resources_by_datasets_and_traits( authconn, _dset_traits["ProbeSet"])) for _rrow in _rtypes } if len(_all_resources.keys()) == 0: raise NotFoundError( "No resource(s) found for specified trait(s). Do(es) the " "trait(s) actually exist?") _resource_ids = tuple(_all_resources.keys()) def __explode_resource_data__(trait_fullname): _dset, _trt = trait_fullname.split("::") return { "dataset_name": _dset, "dataset_type": ( "Phenotype" if _dset.endswith("Publish") else ("Genotype" if _dset.endswith("Geno") else ("Temporary" if _dset.endswith("Temp") else "mRNA"))), "trait_name": _trt, "trait_fullname": trait_fullname } _paramstr = ", ".join(["?"] * len(_resource_ids)) try: with require_oauth.acquire("profile group resource") as _token: user = _token.user cursor.execute( "SELECT ur.resource_id, r.role_id, rp.privilege_id " "FROM user_roles AS ur " "INNER JOIN roles AS r ON ur.role_id=r.role_id " "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id " "WHERE ur.user_id = ? " f"AND ur.resource_id IN ({_paramstr})", (str(user.user_id),) + _resource_ids ) _privileges_by_resource: dict[str, tuple[str, ...]] = reduce( lambda acc, curr: { **acc, curr["resource_id"]: ( acc.get(curr["resource_id"], tuple()) + (curr["privilege_id"],)) }, cursor.fetchall(), {}) except _HTTPException as exc: err_msg = json.loads(exc.body) if err_msg["error"] == "missing_authorization": cursor.execute( "SELECT rsc.resource_id " "FROM resources AS rsc " "WHERE rsc.public = '1' " f"AND rsc.resource_id IN ({_paramstr}) ", _resource_ids) _privileges_by_resource = { row["resource_id"]: ('group:resource:view-resource',) for row in cursor.fetchall() } else: raise exc from None return jsonify({ "authorisation": [{ **resource, "resource_data": [ __explode_resource_data__(item) for item in resource["resource_data"]], "privileges": _privileges_by_resource.get(resource["resource_id"], tuple()) } for resource in _all_resources.values()] }) def __search_mrna__(): query = __request_key__("query", "") limit = int(__request_key__("limit", 10000)) offset = int(__request_key__("offset", 0)) with gn3db.database_connection(app.config["SQL_URI"]) as gn3conn: __ungrouped__ = partial( ungrouped_mrna_data, gn3conn=gn3conn, search_query=query, selected=__request_key_list__("selected"), limit=limit, offset=offset) return jsonify(with_db_connection(__ungrouped__)) def __request_key__(key: str, default: Any = ""): if bool(request_json()): return request_json().get(#type: ignore[union-attr] key, request.args.get(key, default)) return request.args.get(key, request_json().get(key, default)) def __request_key_list__(key: str, default: tuple[Any, ...] = tuple()): if bool(request_json()): return (request_json().get(key,[])#type: ignore[union-attr] or request.args.getlist(key) or request_json().get(key) or list(default)) return (request.args.getlist(key) or request_json().get(key) or list(default)) def __search_genotypes__(): query = __request_key__("query", "") limit = int(__request_key__("limit", 10000)) offset = int(__request_key__("offset", 0)) with gn3db.database_connection(app.config["SQL_URI"]) as gn3conn: __ungrouped__ = partial( ungrouped_genotype_data, gn3conn=gn3conn, search_query=query, selected=__request_key_list__("selected"), limit=limit, offset=offset) return jsonify(with_db_connection(__ungrouped__)) def __search_phenotypes__(): # launch the external process to search for phenotypes redisuri = app.config["REDIS_URI"] with redis.Redis.from_url(redisuri, decode_responses=True) as redisconn: job_id = uuid.uuid4() selected = __request_key__("selected_traits", []) command =[ sys.executable, "-m", "gn_auth.scripts.search_phenotypes", __request_key__("species_name"), __request_key__("query"), str(job_id), f"--host={__request_key__('gn3_server_uri')}", f"--auth-db-uri={app.config['AUTH_DB']}", f"--gn3-db-uri={app.config['SQL_URI']}", f"--redis-uri={redisuri}", f"--per-page={__request_key__('per_page')}"] +( [f"--selected={json.dumps(selected)}"] if len(selected) > 0 else []) jobs.create_job(redisconn, { "job_id": job_id, "command": command, "status": "queued", "search_results": tuple()}) return jsonify({ "job_id": job_id, "command_id": run_async_cmd( redisconn, app.config.get("REDIS_JOB_QUEUE"), command), "command": command }) @data.route("/search", methods=["GET", "POST"]) @require_oauth("profile group resource") def search_unlinked_data(): """Search for various unlinked data.""" dataset_type = request_json()["dataset_type"] search_fns = { "mrna": __search_mrna__, "genotype": __search_genotypes__, "phenotype": __search_phenotypes__ } return search_fns[dataset_type]() @data.route("/search/phenotype/", methods=["GET"]) def pheno_search_results(job_id: uuid.UUID) -> Response: """Get the search results from the external script""" def __search_error__(err): raise NotFoundError(err["error_description"]) redisuri = app.config["REDIS_URI"] with redis.Redis.from_url(redisuri, decode_responses=True) as redisconn: return jobs.job(redisconn, job_id).either( __search_error__, jsonify) @data.route("/link/genotype", methods=["POST"]) def link_genotypes() -> Response: """Link genotype data to group.""" def __values__(form) -> dict[str, Any]: if not bool(form.get("species_name", "").strip()): raise InvalidData("Expected 'species_name' not provided.") if not bool(form.get("group_id")): raise InvalidData("Expected 'group_id' not provided.",) try: _group_id = uuid.UUID(form.get("group_id")) except TypeError as terr: raise InvalidData("Expected a UUID for 'group_id' value.") from terr if not bool(form.get("selected")): raise InvalidData("Expected at least one dataset to be provided.") return { "group_id": uuid.UUID(form.get("group_id")), "datasets": form.get("selected") } def __link__(conn: db.DbConnection, group_id: uuid.UUID, datasets: dict): return link_genotype_data(conn, group_by_id(conn, group_id), datasets) return jsonify(with_db_connection( partial(__link__, **__values__(request_json())))) @data.route("/link/mrna", methods=["POST"]) def link_mrna() -> Response: """Link mrna data to group.""" def __values__(form) -> dict[str, Any]: if not bool(form.get("species_name", "").strip()): raise InvalidData("Expected 'species_name' not provided.") if not bool(form.get("group_id")): raise InvalidData("Expected 'group_id' not provided.",) try: _group_id = uuid.UUID(form.get("group_id")) except TypeError as terr: raise InvalidData("Expected a UUID for 'group_id' value.") from terr if not bool(form.get("selected")): raise InvalidData("Expected at least one dataset to be provided.") return { "group_id": uuid.UUID(form.get("group_id")), "datasets": form.get("selected") } def __link__(conn: db.DbConnection, group_id: uuid.UUID, datasets: dict): return link_mrna_data(conn, group_by_id(conn, group_id), datasets) return jsonify(with_db_connection( partial(__link__, **__values__(request_json())))) @data.route("/link/phenotype", methods=["POST"]) @require_oauth("profile group resource") def link_phenotype() -> Response: """Link phenotype data to group.""" def __values__(form): if not bool(form.get("species_name", "").strip()): raise InvalidData("Expected 'species_name' not provided.") if not bool(form.get("group_id")): raise InvalidData("Expected 'group_id' not provided.",) try: _group_id = uuid.UUID(form.get("group_id")) except TypeError as terr: raise InvalidData("Expected a UUID for 'group_id' value.") from terr if not bool(form.get("selected")): raise InvalidData("Expected at least one dataset to be provided.") return { "group_id": uuid.UUID(form["group_id"]), "traits": form["selected"], "using_raw_ids": bool(form.get("using-raw-ids") == "on") } with (require_oauth.acquire("profile group resource") as token, gn3db.database_connection(app.config["SQL_URI"]) as gn3conn): def __link__( conn: db.DbConnection, group_id: uuid.UUID, traits: tuple[dict, ...], using_raw_ids: bool = False ) -> dict: if using_raw_ids: return link_phenotype_data(conn, token.user, group_by_id(conn, group_id), traits) return link_phenotype_data(conn, token.user, group_by_id(conn, group_id), pheno_traits_from_db(gn3conn, traits)) return jsonify(with_db_connection( partial(__link__, **__values__(request_json()))))