"""Handle data endpoints."""
import sys
import uuid
import json
from dataclasses import asdict
from typing import Any
from functools import partial
import redis
from MySQLdb.cursors import DictCursor
from authlib.integrations.flask_oauth2.errors import _HTTPException
from flask import request, jsonify, Response, Blueprint, current_app as app
from gn_auth import jobs
from gn_auth.commands import run_async_cmd
from gn_auth.auth.requests import request_json
from gn_auth.auth.errors import InvalidData, NotFoundError
from gn_auth.auth.authorisation.resources.groups.models import group_by_id
from ...db import sqlite3 as db
from ...db import mariadb as gn3db
from ...db.sqlite3 import with_db_connection
from ..checks import require_json
from ..users.models import user_resource_roles
from ..resources.checks import authorised_for
from ..resources.models import (
user_resources, public_resources, attach_resources_data)
from ...authentication.users import User
from ...authentication.oauth2.resource_server import require_oauth
from ..data.phenotypes import link_phenotype_data
from ..data.mrna import link_mrna_data, ungrouped_mrna_data
from ..data.genotypes import link_genotype_data, ungrouped_genotype_data
data = Blueprint("data", __name__)
def build_trait_name(trait_fullname):
"""
Initialises the trait's name, and other values from the search data provided
This is a copy of `gn3.db.traits.build_trait_name` function.
"""
def dataset_type(dset_name):
if dset_name.find('Temp') >= 0:
return "Temp"
if dset_name.find('Geno') >= 0:
return "Geno"
if dset_name.find('Publish') >= 0:
return "Publish"
return "ProbeSet"
name_parts = trait_fullname.split("::")
assert len(name_parts) >= 2, f"Name format error: '{trait_fullname}'"
dataset_name = name_parts[0]
dataset_type = dataset_type(dataset_name)
return {
"db": {
"dataset_name": dataset_name,
"dataset_type": dataset_type},
"trait_fullname": trait_fullname,
"trait_name": name_parts[1],
"cellid": name_parts[2] if len(name_parts) == 3 else ""
}
@data.route("species")
def list_species() -> Response:
"""List all available species information."""
with (gn3db.database_connection(app.config["SQL_URI"]) as gn3conn,
gn3conn.cursor(DictCursor) as cursor):
cursor.execute("SELECT * FROM Species")
return jsonify(tuple(dict(row) for row in cursor.fetchall()))
@data.route("/authorisation", methods=["POST"])
@require_json
def authorisation() -> Response:
"""Retrieve the authorisation level for datasets/traits for the user."""
# Access endpoint with something like:
# curl -X POST http://127.0.0.1:8080/api/oauth2/data/authorisation \
# -H "Content-Type: application/json" \
# -d '{"traits": ["HC_M2_0606_P::1442370_at", "BXDGeno::01.001.695",
# "BXDPublish::10001"]}'
db_uri = app.config["AUTH_DB"]
privileges = {}
user = User(uuid.uuid4(), "anon@ymous.user", "Anonymous User")
with db.connection(db_uri) as auth_conn:
try:
with require_oauth.acquire("profile group resource") as _token:
user = _token.user
resources = attach_resources_data(
auth_conn, user_resources(auth_conn, _token.user))
resources_roles = user_resource_roles(auth_conn, _token.user)
privileges = {
resource_id: tuple(
privilege.privilege_id
for roles in resources_roles[resource_id]
for privilege in roles.privileges)#("group:resource:view-resource",)
for resource_id, is_authorised
in authorised_for(
auth_conn, _token.user,
("group:resource:view-resource",), tuple(
resource.resource_id for resource in resources)).items()
if is_authorised
}
except _HTTPException as exc:
err_msg = json.loads(exc.body)
if err_msg["error"] == "missing_authorization":
resources = attach_resources_data(
auth_conn, public_resources(auth_conn))
else:
raise exc from None
def __gen_key__(resource, data_item):
if resource.resource_category.resource_category_key.lower() == "phenotype":
return (
f"{resource.resource_category.resource_category_key.lower()}::"
f"{data_item['dataset_name']}::{data_item['PublishXRefId']}")
return (
f"{resource.resource_category.resource_category_key.lower()}::"
f"{data_item['dataset_name']}")
data_to_resource_map = {
__gen_key__(resource, data_item): resource.resource_id
for resource in resources
for data_item in resource.resource_data
}
privileges = {
**{
resource.resource_id: ("system:resource:public-read",)
for resource in resources if resource.public
},
**privileges}
args = request.get_json()
traits_names = args["traits"] # type: ignore[index]
def __translate__(val):
return {
"Temp": "Temp",
"ProbeSet": "mRNA",
"Geno": "Genotype",
"Publish": "Phenotype"
}[val]
def __trait_key__(trait):
dataset_type = __translate__(trait['db']['dataset_type']).lower()
dataset_name = trait["db"]["dataset_name"]
if dataset_type == "phenotype":
return f"{dataset_type}::{dataset_name}::{trait['trait_name']}"
return f"{dataset_type}::{dataset_name}"
return jsonify(tuple(
{
"user": asdict(user),
**{key:trait[key] for key in ("trait_fullname", "trait_name")},
"dataset_name": trait["db"]["dataset_name"],
"dataset_type": __translate__(trait["db"]["dataset_type"]),
"resource_id": data_to_resource_map.get(__trait_key__(trait)),
"privileges": privileges.get(
data_to_resource_map.get(
__trait_key__(trait),
uuid.UUID("4afa415e-94cb-4189-b2c6-f9ce2b6a878d")),
tuple()) + (
# Temporary traits do not exist in db: Set them
# as public-read
("system:resource:public-read",)
if trait["db"]["dataset_type"] == "Temp"
else tuple())
} for trait in
(build_trait_name(trait_fullname)
for trait_fullname in traits_names)))
def __search_mrna__():
query = __request_key__("query", "")
limit = int(__request_key__("limit", 10000))
offset = int(__request_key__("offset", 0))
with gn3db.database_connection(app.config["SQL_URI"]) as gn3conn:
__ungrouped__ = partial(
ungrouped_mrna_data, gn3conn=gn3conn, search_query=query,
selected=__request_key_list__("selected"),
limit=limit, offset=offset)
return jsonify(with_db_connection(__ungrouped__))
def __request_key__(key: str, default: Any = ""):
if bool(request_json()):
return request_json().get(#type: ignore[union-attr]
key, request.args.get(key, request_json().get(key, default)))
return request.args.get(key, request_json().get(key, default))
def __request_key_list__(key: str, default: tuple[Any, ...] = tuple()):
if bool(request_json()):
return (request_json().get(key,[])#type: ignore[union-attr]
or request.args.getlist(key) or request_json().get(key)
or list(default))
return (request.args.getlist(key)
or request_json().get(key) or list(default))
def __search_genotypes__():
query = __request_key__("query", "")
limit = int(__request_key__("limit", 10000))
offset = int(__request_key__("offset", 0))
with gn3db.database_connection(app.config["SQL_URI"]) as gn3conn:
__ungrouped__ = partial(
ungrouped_genotype_data, gn3conn=gn3conn, search_query=query,
selected=__request_key_list__("selected"),
limit=limit, offset=offset)
return jsonify(with_db_connection(__ungrouped__))
def __search_phenotypes__():
# launch the external process to search for phenotypes
redisuri = app.config["REDIS_URI"]
with redis.Redis.from_url(redisuri, decode_responses=True) as redisconn:
job_id = uuid.uuid4()
selected = __request_key__("selected_traits", [])
command =[
sys.executable, "-m", "scripts.search_phenotypes",
__request_key__("species_name"),
__request_key__("query"),
str(job_id),
f"--host={__request_key__('gn3_server_uri')}",
f"--auth-db-uri={app.config['AUTH_DB']}",
f"--gn3-db-uri={app.config['SQL_URI']}",
f"--redis-uri={redisuri}",
f"--per-page={__request_key__('per_page')}"] +(
[f"--selected={json.dumps(selected)}"]
if len(selected) > 0 else [])
jobs.create_job(redisconn, {
"job_id": job_id, "command": command, "status": "queued",
"search_results": tuple()})
return jsonify({
"job_id": job_id,
"command_id": run_async_cmd(
redisconn, app.config.get("REDIS_JOB_QUEUE"), command),
"command": command
})
@data.route("/search", methods=["GET", "POST"])
@require_oauth("profile group resource")
def search_unlinked_data():
"""Search for various unlinked data."""
dataset_type = request_json()["dataset_type"]
search_fns = {
"mrna": __search_mrna__,
"genotype": __search_genotypes__,
"phenotype": __search_phenotypes__
}
return search_fns[dataset_type]()
@data.route("/search/phenotype/<uuid:job_id>", methods=["GET"])
def pheno_search_results(job_id: uuid.UUID) -> Response:
"""Get the search results from the external script"""
def __search_error__(err):
raise NotFoundError(err["error_description"])
redisuri = app.config["REDIS_URI"]
with redis.Redis.from_url(redisuri, decode_responses=True) as redisconn:
return jobs.job(redisconn, job_id).either(
__search_error__, jsonify)
@data.route("/link/genotype", methods=["POST"])
def link_genotypes() -> Response:
"""Link genotype data to group."""
def __values__(form) -> dict[str, Any]:
if not bool(form.get("species_name", "").strip()):
raise InvalidData("Expected 'species_name' not provided.")
if not bool(form.get("group_id")):
raise InvalidData("Expected 'group_id' not provided.",)
try:
_group_id = uuid.UUID(form.get("group_id"))
except TypeError as terr:
raise InvalidData("Expected a UUID for 'group_id' value.") from terr
if not bool(form.get("selected")):
raise InvalidData("Expected at least one dataset to be provided.")
return {
"group_id": uuid.UUID(form.get("group_id")),
"datasets": form.get("selected")
}
def __link__(conn: db.DbConnection, group_id: uuid.UUID, datasets: dict):
return link_genotype_data(conn, group_by_id(conn, group_id), datasets)
return jsonify(with_db_connection(
partial(__link__, **__values__(request_json()))))
@data.route("/link/mrna", methods=["POST"])
def link_mrna() -> Response:
"""Link mrna data to group."""
def __values__(form) -> dict[str, Any]:
if not bool(form.get("species_name", "").strip()):
raise InvalidData("Expected 'species_name' not provided.")
if not bool(form.get("group_id")):
raise InvalidData("Expected 'group_id' not provided.",)
try:
_group_id = uuid.UUID(form.get("group_id"))
except TypeError as terr:
raise InvalidData("Expected a UUID for 'group_id' value.") from terr
if not bool(form.get("selected")):
raise InvalidData("Expected at least one dataset to be provided.")
return {
"group_id": uuid.UUID(form.get("group_id")),
"datasets": form.get("selected")
}
def __link__(conn: db.DbConnection, group_id: uuid.UUID, datasets: dict):
return link_mrna_data(conn, group_by_id(conn, group_id), datasets)
return jsonify(with_db_connection(
partial(__link__, **__values__(request_json()))))
@data.route("/link/phenotype", methods=["POST"])
def link_phenotype() -> Response:
"""Link phenotype data to group."""
def __values__(form):
if not bool(form.get("species_name", "").strip()):
raise InvalidData("Expected 'species_name' not provided.")
if not bool(form.get("group_id")):
raise InvalidData("Expected 'group_id' not provided.",)
try:
_group_id = uuid.UUID(form.get("group_id"))
except TypeError as terr:
raise InvalidData("Expected a UUID for 'group_id' value.") from terr
if not bool(form.get("selected")):
raise InvalidData("Expected at least one dataset to be provided.")
return {
"group_id": uuid.UUID(form["group_id"]),
"traits": form["selected"]
}
with gn3db.database_connection(app.config["SQL_URI"]) as gn3conn:
def __link__(conn: db.DbConnection, group_id: uuid.UUID,
traits: tuple[dict, ...]) -> dict:
return link_phenotype_data(
conn, gn3conn, group_by_id(conn, group_id), traits)
return jsonify(with_db_connection(
partial(__link__, **__values__(request_json()))))