diff options
Diffstat (limited to 'uploader/phenotypes')
| -rw-r--r-- | uploader/phenotypes/misc.py | 26 | ||||
| -rw-r--r-- | uploader/phenotypes/models.py | 358 | ||||
| -rw-r--r-- | uploader/phenotypes/views.py | 646 |
3 files changed, 911 insertions, 119 deletions
diff --git a/uploader/phenotypes/misc.py b/uploader/phenotypes/misc.py new file mode 100644 index 0000000..cbe3b7f --- /dev/null +++ b/uploader/phenotypes/misc.py @@ -0,0 +1,26 @@ +"""Miscellaneous functions handling phenotypes and phenotypes data.""" +import logging + +logger = logging.getLogger(__name__) + + +def phenotypes_data_differences( + filedata: tuple[dict, ...], dbdata: tuple[dict, ...] +) -> tuple[dict, ...]: + """Compute differences between file data and db data""" + diff = tuple() + for filerow, dbrow in zip( + sorted(filedata, key=lambda item: (item["phenotype_id"], item["xref_id"])), + sorted(dbdata, key=lambda item: (item["PhenotypeId"], item["xref_id"]))): + for samplename, value in filerow["data"].items(): + if value != dbrow["data"].get(samplename, {}).get("value"): + diff = diff + ({ + "PhenotypeId": filerow["phenotype_id"], + "xref_id": filerow["xref_id"], + "DataId": dbrow["DataId"], + "StrainId": dbrow["data"].get(samplename, {}).get("StrainId"), + "StrainName": samplename, + "value": value + },) + + return diff diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 73b1cce..e40155f 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,14 +1,31 @@ """Database and utility functions for phenotypes.""" -from typing import Optional +import logging +import tempfile +from pathlib import Path from functools import reduce from datetime import datetime +from typing import Optional, Iterable import MySQLdb as mdb from MySQLdb.cursors import Cursor, DictCursor -from flask import current_app as app from gn_libs.mysqldb import debug_query +from functional_tools import take + +logger = logging.getLogger(__name__) + + +__PHENO_DATA_TABLES__ = { + "PublishData": { + "table": "PublishData", "valueCol": "value", "DataIdCol": "Id"}, + "PublishSE": { + "table": "PublishSE", "valueCol": "error", "DataIdCol": "DataId"}, + "NStrain": { + "table": "NStrain", "valueCol": "count", "DataIdCol": "DataId"} +} + + def datasets_by_population( conn: mdb.Connection, species_id: int, @@ -32,10 +49,10 @@ def dataset_by_id(conn: mdb.Connection, """Fetch dataset details by identifier""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( - "SELECT s.SpeciesId, pf.* FROM Species AS s " - "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId " - "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId " - "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s", + "SELECT Species.SpeciesId, PublishFreeze.* FROM Species " + "INNER JOIN InbredSet ON Species.Id=InbredSet.SpeciesId " + "INNER JOIN PublishFreeze ON InbredSet.Id=PublishFreeze.InbredSetId " + "WHERE Species.Id=%s AND InbredSet.Id=%s AND PublishFreeze.Id=%s", (species_id, population_id, dataset_id)) return dict(cursor.fetchone()) @@ -54,6 +71,20 @@ def phenotypes_count(conn: mdb.Connection, return int(cursor.fetchone()["total_phenos"]) +def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]: + """Retrieve the publication data for a phenotype if it exists.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT DISTINCT pxr.PhenotypeId, pub.* FROM PublishXRef AS pxr " + "INNER JOIN Publication as pub ON pxr.PublicationId=pub.Id " + "WHERE pxr.PhenotypeId=%s", + (phenotype_id,)) + res = cursor.fetchone() + if res is None: + return res + return dict(res) + + def dataset_phenotypes(conn: mdb.Connection, population_id: int, dataset_id: int, @@ -61,7 +92,8 @@ def dataset_phenotypes(conn: mdb.Connection, limit: Optional[int] = None) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" _query = ( - "SELECT pheno.*, pxr.Id, ist.InbredSetCode FROM Phenotype AS pheno " + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode " + "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " @@ -69,35 +101,45 @@ def dataset_phenotypes(conn: mdb.Connection, f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) -def __phenotype_se__(cursor: Cursor, - species_id: int, - population_id: int, - dataset_id: int, - xref_id: str) -> dict: +def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): """Fetch standard-error values (if they exist) for a phenotype.""" - _sequery = ( - "SELECT pxr.Id AS xref_id, pxr.DataId, str.Id AS StrainId, pse.error, nst.count " - "FROM Phenotype AS pheno " - "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " - "INNER JOIN PublishSE AS pse ON pxr.DataId=pse.DataId " - "INNER JOIN NStrain AS nst ON pse.DataId=nst.DataId " - "INNER JOIN Strain AS str ON nst.StrainId=str.Id " - "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " - "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " - "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " - "WHERE (str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)") - cursor.execute(_sequery, - (species_id, population_id, dataset_id, xref_id)) - return {(row["DataId"], row["StrainId"]): { - "xref_id": row["xref_id"], - "DataId": row["DataId"], - "error": row["error"], - "count": row["count"] - } for row in cursor.fetchall()} + paramstr = ", ".join(["(%s, %s)"] * len(dataids_and_strainids)) + flat = tuple(item for sublist in dataids_and_strainids for item in sublist) + cursor.execute("SELECT * FROM PublishSE WHERE (DataId, StrainId) IN " + f"({paramstr})", + flat) + debug_query(cursor, logger) + _se = { + (row["DataId"], row["StrainId"]): { + "DataId": row["DataId"], + "StrainId": row["StrainId"], + "error": row["error"] + } + for row in cursor.fetchall() + } + + cursor.execute("SELECT * FROM NStrain WHERE (DataId, StrainId) IN " + f"({paramstr})", + flat) + debug_query(cursor, logger) + _n = { + (row["DataId"], row["StrainId"]): { + "DataId": row["DataId"], + "StrainId": row["StrainId"], + "count": row["count"] + } + for row in cursor.fetchall() + } + + keys = set(tuple(_se.keys()) + tuple(_n.keys())) + return { + key: {"xref_id": xref_id, **_se.get(key,{}), **_n.get(key,{})} + for key in keys + } def __organise_by_phenotype__(pheno, row): """Organise disparate data rows into phenotype 'objects'.""" @@ -113,10 +155,12 @@ def __organise_by_phenotype__(pheno, row): "Pre_publication_abbreviation": row["Pre_publication_abbreviation"], "Post_publication_abbreviation": row["Post_publication_abbreviation"], "xref_id": row["pxr.Id"], + "DataId": row["DataId"], "data": { **(_pheno["data"] if bool(_pheno) else {}), (row["DataId"], row["StrainId"]): { "DataId": row["DataId"], + "StrainId": row["StrainId"], "mean": row["mean"], "Locus": row["Locus"], "LRS": row["LRS"], @@ -170,14 +214,12 @@ def phenotype_by_id( **_pheno, "data": tuple(__merge_pheno_data_and_se__( _pheno["data"], - __phenotype_se__(cursor, - species_id, - population_id, - dataset_id, - xref_id)).values()) + __phenotype_se__( + cursor, xref_id, tuple(_pheno["data"].keys())) + ).values()) } if bool(_pheno) and len(_pheno.keys()) > 1: - raise Exception( + raise Exception(# pylint: disable=[broad-exception-raised] "We found more than one phenotype with the same identifier!") return None @@ -202,7 +244,7 @@ def phenotypes_data(conn: mdb.Connection, f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) @@ -229,5 +271,241 @@ def save_new_dataset(cursor: Cursor, "%(created)s, %(public)s, %(population_id)s, %(confidentiality)s, " "%(users)s)", params) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return {**params, "Id": cursor.lastrowid} + + +def phenotypes_data_by_ids( + conn: mdb.Connection, + inbred_pheno_xref: dict[str, int] +) -> tuple[dict, ...]: + """Fetch all phenotype data, filtered by the `inbred_pheno_xref` mapping.""" + _paramstr = ",".join(["(%s, %s, %s)"] * len(inbred_pheno_xref)) + _query = ("SELECT " + "pub.PubMed_ID, pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " + "FROM Publication AS pub " + "RIGHT JOIN PublishXRef AS pxr0 ON pub.Id=pxr0.PublicationId " + "INNER JOIN Phenotype AS pheno ON pxr0.PhenotypeId=pheno.id " + "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " + "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " + "INNER JOIN Strain AS str ON pd.StrainId=str.Id " + "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " + "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " + "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " + f"WHERE (pxr.InbredSetId, pheno.Id, pxr.Id) IN ({_paramstr}) " + "ORDER BY pheno.Id") + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_query, tuple(item for row in inbred_pheno_xref + for item in (row["population_id"], + row["phenoid"], + row["xref_id"]))) + debug_query(cursor, logger) + return tuple( + reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values()) + + +def __pre_process_phenotype_data__(row): + _desc = row.get("description", "") + _pre_pub_desc = row.get("pre_publication_description", _desc) + _orig_desc = row.get("original_description", _desc) + _post_pub_desc = row.get("post_publication_description", _orig_desc) + _pre_pub_abbr = row.get("pre_publication_abbreviation", row["id"]) + _post_pub_abbr = row.get("post_publication_abbreviation", _pre_pub_abbr) + return { + "pre_publication_description": _pre_pub_desc, + "post_publication_description": _post_pub_desc, + "original_description": _orig_desc, + "units": row["units"], + "pre_publication_abbreviation": _pre_pub_abbr, + "post_publication_abbreviation": _post_pub_abbr + } + + +def create_new_phenotypes(# pylint: disable=[too-many-locals] + conn: mdb.Connection, + population_id: int, + publication_id: int, + phenotypes: Iterable[dict] +) -> tuple[dict, ...]: + """Add entirely new phenotypes to the database. WARNING: Not thread-safe.""" + _phenos = tuple() + with conn.cursor(cursorclass=DictCursor) as cursor: + def make_next_id(idcol, table): + cursor.execute(f"SELECT MAX({idcol}) AS last_id FROM {table}") + _last_id = int(cursor.fetchone()["last_id"]) + def __next_id__(): + _next_id = _last_id + 1 + while True: + yield _next_id + _next_id = _next_id + 1 + + return __next_id__ + + ### Bottleneck: Everything below makes this function not ### + ### thread-safe because we have to retrieve the last IDs from ### + ### the database and increment those to compute the next IDs. ### + ### This is an unfortunate result from the current schema that ### + ### has a cross-reference table that requires that a phenotype ### + ### be linked to an existing publication, and have data IDs to ### + ### link to that phenotype's data. ### + ### The fact that the IDs are sequential also compounds the ### + ### bottleneck. ### + ### + ### For extra safety, ensure the following tables are locked ### + ### for `WRITE`: ### + ### - PublishXRef ### + ### - Phenotype ### + ### - PublishXRef ### + __next_xref_id = make_next_id("Id", "PublishXRef")() + __next_pheno_id__ = make_next_id("Id", "Phenotype")() + __next_data_id__ = make_next_id("DataId", "PublishXRef")() + + def __build_params_and_prepubabbrevs__(acc, row): + processed = __pre_process_phenotype_data__(row) + return ( + acc[0] + ({ + **processed, + "population_id": population_id, + "publication_id": publication_id, + "phenotype_id": next(__next_pheno_id__), + "xref_id": next(__next_xref_id), + "data_id": next(__next_data_id__) + },), + acc[1] + (processed["pre_publication_abbreviation"],)) + while True: + batch = take(phenotypes, 1000) + if len(batch) == 0: + break + + params, abbrevs = reduce(__build_params_and_prepubabbrevs__, + batch, + (tuple(), tuple())) + # Check for uniqueness for all "Pre_publication_description" values + abbrevs_paramsstr = ", ".join(["%s"] * len(abbrevs)) + _query = ("SELECT PublishXRef.PhenotypeId, Phenotype.* " + "FROM PublishXRef " + "INNER JOIN Phenotype " + "ON PublishXRef.PhenotypeId=Phenotype.Id " + "WHERE PublishXRef.InbredSetId=%s " + "AND Phenotype.Pre_publication_abbreviation IN " + f"({abbrevs_paramsstr})") + cursor.execute(_query, + ((population_id,) + abbrevs)) + existing = tuple(row["Pre_publication_abbreviation"] + for row in cursor.fetchall()) + if len(existing) > 0: + # Narrow this exception, perhaps? + raise Exception(# pylint: disable=[broad-exception-raised] + "Found already existing phenotypes with the following " + "'Pre-publication abbreviations':\n\t" + "\n\t".join(f"* {item}" for item in existing)) + + cursor.executemany( + ( + "INSERT INTO " + "Phenotype(" + "Id, " + "Pre_publication_description, " + "Post_publication_description, " + "Original_description, " + "Units, " + "Pre_publication_abbreviation, " + "Post_publication_abbreviation, " + "Authorized_Users" + ")" + "VALUES (" + "%(phenotype_id)s, " + "%(pre_publication_description)s, " + "%(post_publication_description)s, " + "%(original_description)s, " + "%(units)s, " + "%(pre_publication_abbreviation)s, " + "%(post_publication_abbreviation)s, " + "'robwilliams'" + ")"), + params) + _comments = f"Created at {datetime.now().isoformat()}" + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, " + "InbredSetId, " + "PhenotypeId, " + "PublicationId, " + "DataId, " + "comments" + ")" + "VALUES(" + "%(xref_id)s, " + "%(population_id)s, " + "%(phenotype_id)s, " + "%(publication_id)s, " + "%(data_id)s, " + f"'{_comments}'" + ")"), + params) + _phenos = _phenos + params + + return _phenos + + +def save_phenotypes_data( + conn: mdb.Connection, + table: str, + data: Iterable[dict] +) -> int: + """Save new phenotypes data into the database.""" + _table_details = __PHENO_DATA_TABLES__[table] + with conn.cursor(cursorclass=DictCursor) as cursor: + _count = 0 + while True: + batch = take(data, 100000) + if len(batch) == 0: + logger.warning("Got an empty batch. This needs investigation.") + break + + logger.debug("Saving batch of %s items.", len(batch)) + cursor.executemany( + (f"INSERT INTO {_table_details['table']}" + f"({_table_details['DataIdCol']}, StrainId, {_table_details['valueCol']}) " + "VALUES " + f"(%(data_id)s, %(sample_id)s, %(value)s) "), + tuple(batch)) + debug_query(cursor, logger) + _count = _count + len(batch) + + + logger.debug("Saved a total of %s data rows", _count) + return _count + + +def quick_save_phenotypes_data( + conn: mdb.Connection, + table: str, + dataitems: Iterable[dict], + tmpdir: Path +) -> int: + """Save data items to the database, but using """ + _table_details = __PHENO_DATA_TABLES__[table] + with (tempfile.NamedTemporaryFile( + prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile, + conn.cursor(cursorclass=DictCursor) as cursor): + _count = 0 + logger.debug("Write data rows to text file.") + for row in dataitems: + tmpfile.write( + f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n') + _count = _count + 1 + tmpfile.flush() + + logger.debug("Load text file into database (table: %s)", + _table_details["table"]) + cursor.execute( + f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " + f"INTO TABLE {_table_details['table']} " + "(" + f"{_table_details['DataIdCol']}, " + "StrainId, " + f"{_table_details['valueCol']}" + ")") + debug_query(cursor, logger) + return _count diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py index 8ecd305..7002ccd 100644 --- a/uploader/phenotypes/views.py +++ b/uploader/phenotypes/views.py @@ -1,17 +1,26 @@ -"""Views handling ('classical') phenotypes.""" +"""Views handling ('classical') phenotypes."""# pylint: disable=[too-many-lines] import sys import uuid import json -import datetime +import logging +from typing import Any from pathlib import Path from zipfile import ZipFile from functools import wraps, reduce -from logging import INFO, ERROR, DEBUG, FATAL, CRITICAL, WARNING +from urllib.parse import urljoin, urlparse, ParseResult, urlunparse, urlencode + +import datetime from redis import Redis +from pymonad.either import Left from requests.models import Response from MySQLdb.cursors import DictCursor + +from gn_libs import sqlite3 +from gn_libs import jobs as gnlibs_jobs +from gn_libs.jobs.jobs import JobNotFound from gn_libs.mysqldb import database_connection + from flask import (flash, request, url_for, @@ -20,21 +29,25 @@ from flask import (flash, Blueprint, current_app as app) -# from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe + from uploader import jobs +from uploader import session from uploader.files import save_file#, fullpath from uploader.ui import make_template_renderer from uploader.oauth2.client import oauth2_post +from uploader.oauth2.tokens import request_token from uploader.authorisation import require_login +from uploader.oauth2 import client as oauth2client +from uploader.route_utils import build_next_argument +from uploader.route_utils import generic_select_population +from uploader.datautils import safe_int, enumerate_sequence from uploader.species.models import all_species, species_by_id from uploader.monadic_requests import make_either_error_handler +from uploader.publications.models import fetch_publication_by_id from uploader.request_checks import with_species, with_population -from uploader.datautils import safe_int, order_by_family, enumerate_sequence -from uploader.population.models import (populations_by_species, - population_by_species_and_id) from uploader.input_validation import (encode_errors, decode_errors, is_valid_representative_name) @@ -47,9 +60,13 @@ from .models import (dataset_by_id, datasets_by_population, phenotype_publication_data) +logger = logging.getLogger(__name__) phenotypesbp = Blueprint("phenotypes", __name__) render_template = make_template_renderer("phenotypes") +_FAMILIES_WITH_SE_AND_N_ = ( + "Reference Populations (replicate average, SE, N)",) + @phenotypesbp.route("/phenotypes", methods=["GET"]) @require_login def index(): @@ -57,10 +74,16 @@ def index(): with database_connection(app.config["SQL_URI"]) as conn: if not bool(request.args.get("species_id")): return render_template("phenotypes/index.html", - species=order_by_family(all_species(conn)), + species=all_species(conn), activelink="phenotypes") - species = species_by_id(conn, request.args.get("species_id")) + species_id = request.args.get("species_id") + if species_id == "CREATE-SPECIES": + return redirect(url_for( + "species.create_species", + return_to="species.populations.phenotypes.select_population")) + + species = species_by_id(conn, species_id) if not bool(species): flash("No such species!", "alert-danger") return redirect(url_for("species.populations.phenotypes.index")) @@ -74,27 +97,14 @@ def index(): @with_species(redirect_uri="species.populations.phenotypes.index") def select_population(species: dict, **kwargs):# pylint: disable=[unused-argument] """Select the population for your phenotypes.""" - with database_connection(app.config["SQL_URI"]) as conn: - if not bool(request.args.get("population_id")): - return render_template("phenotypes/select-population.html", - species=species, - populations=order_by_family( - populations_by_species( - conn, species["SpeciesId"]), - order_key="FamilyOrder"), - activelink="phenotypes") - - population = population_by_species_and_id( - conn, species["SpeciesId"], int(request.args["population_id"])) - if not bool(population): - flash("No such population found!", "alert-danger") - return redirect(url_for( - "species.populations.phenotypes.select_population", - species_id=species["SpeciesId"])) - - return redirect(url_for("species.populations.phenotypes.list_datasets", - species_id=species["SpeciesId"], - population_id=population["Id"])) + return generic_select_population( + species, + "phenotypes/select-population.html", + request.args.get("population_id") or "", + "species.populations.phenotypes.select_population", + "species.populations.phenotypes.list_datasets", + "phenotypes", + "No such population found!") @@ -192,12 +202,10 @@ def view_dataset(# pylint: disable=[unused-argument] phenotype_count=phenotypes_count( conn, population["Id"], dataset["Id"]), phenotypes=enumerate_sequence( - dataset_phenotypes(conn, - population["Id"], - dataset["Id"], - offset=start_at, - limit=count), - start=start_at+1), + dataset_phenotypes( + conn, + population["Id"], + dataset["Id"])), start_from=start_at, count=count, activelink="view-dataset") @@ -231,22 +239,17 @@ def view_phenotype(# pylint: disable=[unused-argument] species=species, population=population, dataset=dataset, + xref_id=xref_id, phenotype=phenotype, - has_se=all(bool(item.get("error")) for item in phenotype["data"]), - publish_data={ - key.replace("_", " "): val - for key,val in - (phenotype_publication_data(conn, phenotype["Id"]) or {}).items() - if (key in ("PubMed_ID", "Authors", "Title", "Journal") - and val is not None - and val.strip() is not "") - }, - privileges=(privileges - ### For demo! Do not commit this part - + ("group:resource:edit-resource", - "group:resource:delete-resource",) - ### END: For demo! Do not commit this part - ), + has_se=any(bool(item.get("error")) for item in phenotype["data"]), + publication=(phenotype_publication_data(conn, phenotype["Id"]) or {}), + privileges=privileges, + next=build_next_argument( + uri="species.populations.phenotypes.view_phenotype", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"], + xref_id=xref_id), activelink="view-phenotype") def __fail__(error): @@ -353,10 +356,17 @@ def process_phenotypes_individual_files(error_uri): bundlepath = Path(app.config["UPLOAD_FOLDER"], f"{str(uuid.uuid4()).replace('-', '')}.zip") with ZipFile(bundlepath,mode="w") as zfile: - for rqtlkey, formkey in (("phenocovar", "phenotype-descriptions"), - ("pheno", "phenotype-data"), - ("phenose", "phenotype-se"), - ("phenonum", "phenotype-n")): + for rqtlkey, formkey, _type in ( + ("phenocovar", "phenotype-descriptions", "mandatory"), + ("pheno", "phenotype-data", "mandatory"), + ("phenose", "phenotype-se", "optional"), + ("phenonum", "phenotype-n", "optional")): + if _type == "optional" and not bool(form.get(formkey)): + continue # skip if an optional key does not exist. + + cdata[f"{rqtlkey}_transposed"] = ( + (form.get(f"{formkey}-transposed") or "off") == "on") + if form.get("resumable-upload", False): # Chunked upload of large files was used filedata = json.loads(form[formkey]) @@ -365,7 +375,7 @@ def process_phenotypes_individual_files(error_uri): arcname=filedata["original-name"]) cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filedata["original-name"]] else: - # TODO: Check this path: fix any bugs. + # T0DO: Check this path: fix any bugs. _sentfile = request.files[formkey] if not bool(_sentfile): flash(f"Expected file ('{formkey}') was not provided.", @@ -379,6 +389,7 @@ def process_phenotypes_individual_files(error_uri): arcname=filepath.name) cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filepath.name] + zfile.writestr("control_data.json", data=json.dumps(cdata, indent=2)) return bundlepath @@ -403,10 +414,7 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p dataset_id=dataset["Id"])) _redisuri = app.config["REDIS_URL"] _sqluri = app.config["SQL_URI"] - with (Redis.from_url(_redisuri, decode_responses=True) as rconn, - # database_connection(_sqluri) as conn, - # conn.cursor(cursorclass=DictCursor) as cursor - ): + with Redis.from_url(_redisuri, decode_responses=True) as rconn: if request.method == "GET": today = datetime.date.today() return render_template( @@ -422,8 +430,7 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p "December"), current_month=today.strftime("%B"), current_year=int(today.strftime("%Y")), - families_with_se_and_n=( - "Reference Populations (replicate average, SE, N)",), + families_with_se_and_n=_FAMILIES_WITH_SE_AND_N_, use_bundle=use_bundle, activelink="add-phenotypes") @@ -442,24 +449,20 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p [sys.executable, "-m", "scripts.rqtl2.phenotypes_qc", _sqluri, _redisuri, _namespace, str(_jobid), str(species["SpeciesId"]), str(population["Id"]), - # str(dataset["Id"]), str(phenobundle), "--loglevel", - { - INFO: "INFO", - ERROR: "ERROR", - DEBUG: "DEBUG", - FATAL: "FATAL", - CRITICAL: "CRITICAL", - WARNING: "WARNING" - }[app.logger.getEffectiveLevel()], + logging.getLevelName( + app.logger.getEffectiveLevel() + ).lower(), "--redisexpiry", str(_ttl_seconds)], "phenotype_qc", _ttl_seconds, {"job-metadata": json.dumps({ "speciesid": species["SpeciesId"], "populationid": population["Id"], "datasetid": dataset["Id"], - "bundle": str(phenobundle.absolute())})}), + "bundle": str(phenobundle.absolute()), + **({"publicationid": request.form["publication-id"]} + if request.form.get("publication-id") else {})})}), _redisuri, f"{app.config['UPLOAD_FOLDER']}/job_errors") @@ -517,7 +520,7 @@ def job_status( @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" - "/<int:dataset_id>/review-job/<uuid:job_id>", + "/<int:dataset_id>/job/<uuid:job_id>/review", methods=["GET"]) @require_login @with_dataset( @@ -532,7 +535,8 @@ def review_job_data( **kwargs ):# pylint: disable=[unused-argument] """Review data one more time before entering it into the database.""" - with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn, + database_connection(app.config["SQL_URI"]) as conn): try: job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id)) except jobs.JobNotFound as _jnf: @@ -545,11 +549,12 @@ def review_job_data( filetype: (by_type.get(filetype, tuple()) + ({"filename": item[0], **item[1]},)) } - metadata = reduce(__metadata_by_type__, - (jobs.job_files_metadata( - rconn, jobs.jobsnamespace(), job['jobid']) - if job else {}).items(), - {}) + metadata: dict[str, Any] = reduce( + __metadata_by_type__, + (jobs.job_files_metadata( + rconn, jobs.jobsnamespace(), job['jobid']) + if job else {}).items(), + {}) def __desc__(filetype): match filetype: @@ -579,6 +584,7 @@ def review_job_data( filetype: __summarise__(filetype, meta) for filetype,meta in metadata.items() } + _job_metadata = json.loads(job["job-metadata"]) return render_template("phenotypes/review-job-data.html", species=species, population=population, @@ -586,4 +592,486 @@ def review_job_data( job_id=job_id, job=job, summary=summary, + publication=( + fetch_publication_by_id( + conn, int(_job_metadata["publicationid"])) + if _job_metadata.get("publicationid") + else None), activelink="add-phenotypes") + + +def load_phenotypes_success_handler(job): + """Handle loading new phenotypes into the database successfully.""" + return redirect(url_for( + "species.populations.phenotypes.load_data_success", + species_id=job["metadata"]["species_id"], + population_id=job["metadata"]["population_id"], + dataset_id=job["metadata"]["dataset_id"], + job_id=job["job_id"])) + + +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/load-data-to-database", + methods=["POST"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def load_data_to_database( + species: dict, + population: dict, + dataset: dict, + **kwargs +):# pylint: disable=[unused-argument] + """Load the data from the given QC job into the database.""" + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn, + sqlite3.connection(_jobs_db) as conn): + # T0DO: Maybe break the connection between the jobs here, pass: + # - the bundle name (rebuild the full path here.) + # - publication details, where separate + # - details about the files: e.g. total lines, etc + qc_job = jobs.job(rconn, jobs.jobsnamespace(), request.form["data-qc-job-id"]) + _meta = json.loads(qc_job["job-metadata"]) + _load_job_id = uuid.uuid4() + _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower() + command = [ + sys.executable, + "-u", + "-m", + "scripts.load_phenotypes_to_db", + app.config["SQL_URI"], + _jobs_db, + str(_load_job_id), + "--log-level", + _loglevel + ] + + def __handle_error__(resp): + return render_template("http-error.html", *resp.json()) + + def __handle_success__(load_job): + app.logger.debug("The phenotypes loading job: %s", load_job) + return redirect(url_for( + "background-jobs.job_status", job_id=load_job["job_id"])) + + + return request_token( + token_uri=urljoin(oauth2client.authserver_uri(), "auth/token"), + user_id=session.user_details()["user_id"] + ).then( + lambda token: gnlibs_jobs.initialise_job( + conn, + _load_job_id, + command, + "load-new-phenotypes-data", + extra_meta={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "bundle_file": _meta["bundle"], + "publication_id": _meta["publicationid"], + "authserver": oauth2client.authserver_uri(), + "token": token["access_token"], + "success_handler": ( + "uploader.phenotypes.views" + ".load_phenotypes_success_handler") + }) + ).then( + lambda job: gnlibs_jobs.launch_job( + job, + _jobs_db, + Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + ).either(__handle_error__, __handle_success__) + + +def update_phenotype_metadata(conn, metadata: dict): + """Update a phenotype's basic metadata values.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM Phenotype WHERE Id=%(phenotype-id)s", + metadata) + res = { + **{ + _key: _val for _key,_val in { + key.lower().replace("_", "-"): value + for key, value in (cursor.fetchone() or {}).items() + }.items() + if _key in metadata.keys() + }, + "phenotype-id": metadata.get("phenotype-id") + } + if res == metadata: + return False + + cursor.execute( + "UPDATE Phenotype SET " + "Pre_publication_description=%(pre-publication-description)s, " + "Post_publication_description=%(post-publication-description)s, " + "Original_description=%(original-description)s, " + "Units=%(units)s, " + "Pre_publication_abbreviation=%(pre-publication-abbreviation)s, " + "Post_publication_abbreviation=%(post-publication-abbreviation)s " + "WHERE Id=%(phenotype-id)s", + metadata) + return cursor.rowcount + + +def update_phenotype_values(conn, values): + """Update a phenotype's data values.""" + with conn.cursor() as cursor: + cursor.executemany( + "UPDATE PublishData SET value=%(new)s " + "WHERE Id=%(data_id)s AND StrainId=%(strain_id)s", + tuple(item for item in values if item["new"] is not None)) + cursor.executemany( + "DELETE FROM PublishData " + "WHERE Id=%(data_id)s AND StrainId=%(strain_id)s", + tuple(item for item in values if item["new"] is None)) + return len(values) + return 0 + + +def update_phenotype_se(conn, serrs): + """Update a phenotype's standard-error values.""" + with conn.cursor() as cursor: + cursor.executemany( + "INSERT INTO PublishSE(DataId, StrainId, error) " + "VALUES(%(data_id)s, %(strain_id)s, %(new)s) " + "ON DUPLICATE KEY UPDATE error=VALUES(error)", + tuple(item for item in serrs if item["new"] is not None)) + cursor.executemany( + "DELETE FROM PublishSE " + "WHERE DataId=%(data_id)s AND StrainId=%(strain_id)s", + tuple(item for item in serrs if item["new"] is None)) + return len(serrs) + return 0 + + +def update_phenotype_n(conn, counts): + """Update a phenotype's strain counts.""" + with conn.cursor() as cursor: + cursor.executemany( + "INSERT INTO NStrain(DataId, StrainId, count) " + "VALUES(%(data_id)s, %(strain_id)s, %(new)s) " + "ON DUPLICATE KEY UPDATE count=VALUES(count)", + tuple(item for item in counts if item["new"] is not None)) + cursor.executemany( + "DELETE FROM NStrain " + "WHERE DataId=%(data_id)s AND StrainId=%(strain_id)s", + tuple(item for item in counts if item["new"] is None)) + return len(counts) + + return 0 + + +def update_phenotype_data(conn, data: dict): + """Update the numeric data for a phenotype.""" + def __organise_by_dataid_and_strainid__(acc, current): + _key, dataid, strainid = current[0].split("::") + _keysrc, _keytype = _key.split("-") + newkey = f"{dataid}::{strainid}" + newitem = acc.get(newkey, {}) + newitem[_keysrc] = newitem.get(_keysrc, {}) + newitem[_keysrc][_keytype] = current[1] + return {**acc, newkey: newitem} + + def __separate_items__(acc, row): + key, val = row + return ({ + **acc[0], + key: { + **val["value"], + "changed?": (not val["value"]["new"] == val["value"]["original"]) + } + }, { + **acc[1], + key: { + **val["se"], + "changed?": (not val["se"]["new"] == val["se"]["original"]) + } + },{ + **acc[2], + key: { + **val["n"], + "changed?": (not val["n"]["new"] == val["n"]["original"]) + } + }) + + values, serrs, counts = tuple( + tuple({ + "data_id": row[0].split("::")[0], + "strain_id": row[0].split("::")[1], + "new": row[1]["new"] + } for row in item) + for item in ( + filter(lambda val: val[1]["changed?"], item.items())# type: ignore[arg-type] + for item in reduce(# type: ignore[var-annotated] + __separate_items__, + reduce(__organise_by_dataid_and_strainid__, + data.items(), + {}).items(), + ({}, {}, {})))) + + return (update_phenotype_values(conn, values), + update_phenotype_se(conn, serrs), + update_phenotype_n(conn, counts)) + + +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/phenotype/<int:xref_id>/edit", + methods=["GET", "POST"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def edit_phenotype_data(# pylint: disable=[unused-argument] + species: dict, + population: dict, + dataset: dict, + xref_id: int, + **kwargs +): + """Edit the data for a particular phenotype.""" + def __render__(**kwargs): + processed_kwargs = { + **kwargs, + "privileges": kwargs.get("privileges", tuple()) + } + return render_template( + "phenotypes/edit-phenotype.html", + species=species, + population=population, + dataset=dataset, + xref_id=xref_id, + families_with_se_and_n=_FAMILIES_WITH_SE_AND_N_, + **processed_kwargs, + activelink="edit-phenotype") + + with database_connection(app.config["SQL_URI"]) as conn: + if request.method == "GET": + def __fetch_phenotype__(privileges): + phenotype = phenotype_by_id(conn, + species["SpeciesId"], + population["Id"], + dataset["Id"], + xref_id) + if phenotype is None: + msg = ("Could not find the phenotype with cross-reference ID" + f" '{xref_id}' from dataset '{dataset['FullName']}' " + f" from the '{population['FullName']}' population of " + f" species '{species['FullName']}'.") + return Left({"privileges": privileges, "phenotype-error": msg}) + return {"privileges": privileges, "phenotype": phenotype} + + def __fetch_publication_data__(**kwargs): + pheno = kwargs["phenotype"] + return { + **kwargs, + "publication_data": phenotype_publication_data( + conn, pheno["Id"]) + } + + def __fail__(failure_object): + # process the object + return __render__(failure_object=failure_object) + + return oauth2_post( + "/auth/resource/phenotypes/individual/linked-resource", + json={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "xref_id": xref_id + } + ).then( + lambda resource: tuple( + privilege["privilege_id"] for role in resource["roles"] + for privilege in role["privileges"]) + ).then( + __fetch_phenotype__ + ).then( + lambda args: __fetch_publication_data__(**args) + ).either(__fail__, lambda args: __render__(**args)) + + ## POST + _change = False + match request.form.get("submit", "invalid-action"): + case "update basic metadata": + _change = update_phenotype_metadata(conn, { + key: value.strip() if bool(value.strip()) else None + for key, value in request.form.items() + if key not in ("submit",) + }) + msg = "Basic metadata was updated successfully." + case "update data": + _update = update_phenotype_data(conn, { + key: value.strip() if bool(value.strip()) else None + for key, value in request.form.items() + if key not in ("submit",) + }) + msg = (f"{_update[0]} value rows, {_update[1]} standard-error " + f"rows and {_update[2]} 'N' rows were updated.") + _change = any(item != 0 for item in _update) + case "update publication": + flash("NOT IMPLEMENTED: Would update publication data.", "alert-success") + case _: + flash("Invalid phenotype editing action.", "alert-danger") + + if _change: + flash(msg, "alert-success") + return redirect(url_for( + "species.populations.phenotypes.view_phenotype", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"], + xref_id=xref_id)) + + flash("No change was made by the user.", "alert-info") + return redirect(url_for( + "species.populations.phenotypes.edit_phenotype_data", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"], + xref_id=xref_id)) + + +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/load-data-success/<uuid:job_id>", + methods=["GET"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def load_data_success( + species: dict, + population: dict, + dataset: dict, + job_id: uuid.UUID, + **kwargs +):# pylint: disable=[unused-argument] + """Display success page if loading data to database was successful.""" + with (database_connection(app.config["SQL_URI"]) as conn, + sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) + as jobsconn): + try: + gn2_uri = urlparse(app.config["GN2_SERVER_URL"]) + job = gnlibs_jobs.job(jobsconn, job_id, fulldetails=True) + app.logger.debug("THE JOB: %s", job) + _xref_ids = tuple( + str(item) for item + in json.loads(job["metadata"].get("xref_ids", "[]"))) + _publication = fetch_publication_by_id( + conn, int(job["metadata"].get("publication_id", "0"))) + _search_terms = (item for item in + (str(_publication["PubMed_ID"] or ""), + _publication["Authors"], + (_publication["Title"] or "")) + if item != "") + return render_template("phenotypes/load-phenotypes-success.html", + species=species, + population=population, + dataset=dataset, + job=job, + search_page_uri=urlunparse(ParseResult( + scheme=gn2_uri.scheme, + netloc=gn2_uri.netloc, + path="/search", + params="", + query=urlencode({ + "species": species["Name"], + "group": population["Name"], + "type": "Phenotypes", + "dataset": dataset["Name"], + "search_terms_or": ( + # Very long URLs will cause + # errors. + " ".join(_xref_ids) + if len(_xref_ids) <= 100 + else ""), + "search_terms_and": " ".join( + _search_terms).strip(), + "accession_id": "None", + "FormID": "searchResult" + }), + fragment=""))) + except JobNotFound as _jnf: + return render_template("jobs/job-not-found.html", job_id=job_id) + + +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/recompute-means", + methods=["POST"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def recompute_means(# pylint: disable=[unused-argument] + species: dict, + population: dict, + dataset: dict, + **kwargs +): + """Compute/Recompute the means for phenotypes in a particular population.""" + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + _job_id = uuid.uuid4() + _xref_ids = tuple(int(item.split("_")[-1]) + for item in request.form.getlist("selected-phenotypes")) + + _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower() + command = [ + sys.executable, + "-u", + "-m", + "scripts.compute_phenotype_means", + app.config["SQL_URI"], + _jobs_db, + str(population["Id"]), + "--log-level", + _loglevel] + ( + ["--cross-ref-ids", ",".join(str(_id) for _id in _xref_ids)] + if len(_xref_ids) > 0 else + []) + logger.debug("%s.recompute_means: command (%s)", __name__, command) + + with sqlite3.connection(_jobs_db) as conn: + _job = gnlibs_jobs.launch_job( + gnlibs_jobs.initialise_job( + conn, + _job_id, + command, + "(re)compute-phenotype-means", + extra_meta={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "success_handler": ( + "uploader.phenotypes.views." + "recompute_phenotype_means_success_handler") + }), + _jobs_db, + Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + return redirect(url_for("background-jobs.job_status", + job_id=_job["job_id"])) + + +def recompute_phenotype_means_success_handler(job): + """Handle loading new phenotypes into the database successfully.""" + flash("Means computed successfully!", "alert alert-success") + return redirect(url_for( + "species.populations.phenotypes.view_dataset", + species_id=job["metadata"]["species_id"], + population_id=job["metadata"]["population_id"], + dataset_id=job["metadata"]["dataset_id"], + job_id=job["job_id"])) |
