diff options
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r-- | uploader/phenotypes/models.py | 168 |
1 files changed, 156 insertions, 12 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index e1ec0c9..45fcd30 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,14 +1,30 @@ """Database and utility functions for phenotypes.""" -from typing import Optional +import logging +import tempfile +from pathlib import Path from functools import reduce from datetime import datetime +from typing import Optional, Iterable import MySQLdb as mdb from MySQLdb.cursors import Cursor, DictCursor -from flask import current_app as app +from functional_tools import take from gn_libs.mysqldb import debug_query +logger = logging.getLogger(__name__) + + +__PHENO_DATA_TABLES__ = { + "PublishData": { + "table": "PublishData", "valueCol": "value", "DataIdCol": "Id"}, + "PublishSE": { + "table": "PublishSE", "valueCol": "error", "DataIdCol": "DataId"}, + "NStrain": { + "table": "NStrain", "valueCol": "count", "DataIdCol": "DataId"} +} + + def datasets_by_population( conn: mdb.Connection, species_id: int, @@ -32,10 +48,10 @@ def dataset_by_id(conn: mdb.Connection, """Fetch dataset details by identifier""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( - "SELECT s.SpeciesId, pf.* FROM Species AS s " - "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId " - "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId " - "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s", + "SELECT Species.SpeciesId, PublishFreeze.* FROM Species " + "INNER JOIN InbredSet ON Species.Id=InbredSet.SpeciesId " + "INNER JOIN PublishFreeze ON InbredSet.Id=PublishFreeze.InbredSetId " + "WHERE Species.Id=%s AND InbredSet.Id=%s AND PublishFreeze.Id=%s", (species_id, population_id, dataset_id)) return dict(cursor.fetchone()) @@ -75,7 +91,7 @@ def dataset_phenotypes(conn: mdb.Connection, limit: Optional[int] = None) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" _query = ( - "SELECT pheno.*, pxr.Id AS xref_id, ist.InbredSetCode FROM Phenotype AS pheno " + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " @@ -83,7 +99,7 @@ def dataset_phenotypes(conn: mdb.Connection, f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) @@ -94,7 +110,7 @@ def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): cursor.execute("SELECT * FROM PublishSE WHERE (DataId, StrainId) IN " f"({paramstr})", flat) - debug_query(cursor, app.logger) + debug_query(cursor, logger) _se = { (row["DataId"], row["StrainId"]): { "DataId": row["DataId"], @@ -107,7 +123,7 @@ def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): cursor.execute("SELECT * FROM NStrain WHERE (DataId, StrainId) IN " f"({paramstr})", flat) - debug_query(cursor, app.logger) + debug_query(cursor, logger) _n = { (row["DataId"], row["StrainId"]): { "DataId": row["DataId"], @@ -137,6 +153,7 @@ def __organise_by_phenotype__(pheno, row): "Pre_publication_abbreviation": row["Pre_publication_abbreviation"], "Post_publication_abbreviation": row["Post_publication_abbreviation"], "xref_id": row["pxr.Id"], + "DataId": row["DataId"], "data": { **(_pheno["data"] if bool(_pheno) else {}), (row["DataId"], row["StrainId"]): { @@ -225,7 +242,7 @@ def phenotypes_data(conn: mdb.Connection, f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) @@ -252,5 +269,132 @@ def save_new_dataset(cursor: Cursor, "%(created)s, %(public)s, %(population_id)s, %(confidentiality)s, " "%(users)s)", params) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return {**params, "Id": cursor.lastrowid} + + +def phenotypes_data_by_ids( + conn: mdb.Connection, + inbred_pheno_xref: dict[str, int] +) -> tuple[dict, ...]: + """Fetch all phenotype data, filtered by the `inbred_pheno_xref` mapping.""" + _paramstr = ",".join(["(%s, %s, %s)"] * len(inbred_pheno_xref)) + _query = ("SELECT " + "pub.PubMed_ID, pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " + "FROM Publication AS pub " + "RIGHT JOIN PublishXRef AS pxr0 ON pub.Id=pxr0.PublicationId " + "INNER JOIN Phenotype AS pheno ON pxr0.PhenotypeId=pheno.id " + "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " + "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " + "INNER JOIN Strain AS str ON pd.StrainId=str.Id " + "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " + "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " + "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " + f"WHERE (pxr.InbredSetId, pheno.Id, pxr.Id) IN ({_paramstr}) " + "ORDER BY pheno.Id") + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_query, tuple(item for row in inbred_pheno_xref + for item in (row["population_id"], + row["phenoid"], + row["xref_id"]))) + debug_query(cursor, logger) + return tuple( + reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values()) + + +def create_new_phenotypes(conn: mdb.Connection, + phenotypes: Iterable[dict]) -> tuple[dict, ...]: + """Add entirely new phenotypes to the database.""" + _phenos = tuple() + with conn.cursor(cursorclass=DictCursor) as cursor: + while True: + batch = take(phenotypes, 1000) + if len(batch) == 0: + break + + cursor.executemany( + ("INSERT INTO " + "Phenotype(" + "Pre_publication_description, Post_publication_description, " + "Original_description, Units, Pre_publication_abbreviation, " + "Post_publication_abbreviation, Authorized_Users" + ")" + "VALUES (%s, %s, %s, %s, %s, %s, 'robwilliams')"), + tuple((row["id"], row["description"], row["description"], row["units"], row["id"], row["id"]) + for row in batch)) + paramstr = ", ".join(["%s"] * len(batch)) + cursor.execute( + "SELECT * FROM Phenotype WHERE Pre_publication_description IN " + f"({paramstr})", + tuple(item["id"] for item in batch)) + _phenos = _phenos + tuple({ + "phenotype_id": row["Id"], + "id": row["Pre_publication_description"], + "description": row["Original_description"], + "units": row["Units"] + } for row in cursor.fetchall()) + + return _phenos + + +def save_phenotypes_data( + conn: mdb.Connection, + table: str, + data: Iterable[dict] +) -> int: + """Save new phenotypes data into the database.""" + _table_details = __PHENO_DATA_TABLES__[table] + with conn.cursor(cursorclass=DictCursor) as cursor: + _count = 0 + while True: + batch = take(data, 100000) + if len(batch) == 0: + logger.warning("Got an empty batch. This needs investigation.") + break + + logger.debug("Saving batch of %s items.", len(batch)) + cursor.executemany( + (f"INSERT INTO {_table_details['table']}" + f"({_table_details['DataIdCol']}, StrainId, {_table_details['valueCol']}) " + "VALUES " + f"(%(data_id)s, %(sample_id)s, %(value)s) "), + tuple(batch)) + debug_query(cursor, logger) + _count = _count + len(batch) + + + logger.debug("Saved a total of %s data rows", _count) + return _count + + +def quick_save_phenotypes_data( + conn: mdb.Connection, + table: str, + dataitems: Iterable[dict], + tmpdir: Path +) -> int: + """Save data items to the database, but using """ + _table_details = __PHENO_DATA_TABLES__[table] + with (tempfile.NamedTemporaryFile( + prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile, + conn.cursor(cursorclass=DictCursor) as cursor): + _count = 0 + logger.debug("Write data rows to text file.") + for row in dataitems: + tmpfile.write( + f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n') + _count = _count + 1 + tmpfile.flush() + + logger.debug("Load text file into database (table: %s)", + _table_details["table"]) + cursor.execute( + f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " + f"INTO TABLE {_table_details['table']} " + "(" + f"{_table_details['DataIdCol']}, " + "StrainId, " + f"{_table_details['valueCol']}" + ")") + debug_query(cursor, logger) + return _count |