diff options
Diffstat (limited to 'uploader/phenotypes/models.py')
| -rw-r--r-- | uploader/phenotypes/models.py | 495 |
1 files changed, 442 insertions, 53 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 4a229e6..3946a0f 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,16 +1,35 @@ """Database and utility functions for phenotypes.""" -from typing import Optional +import time +import random +import logging +import tempfile +from pathlib import Path from functools import reduce from datetime import datetime +from typing import Union, Optional, Iterable -import MySQLdb as mdb -from MySQLdb.cursors import Cursor, DictCursor -from flask import current_app as app +from MySQLdb.connections import Connection +from MySQLdb.cursors import Cursor, DictCursor, BaseCursor from gn_libs.mysqldb import debug_query +from functional_tools import take + +logger = logging.getLogger(__name__) + + +__PHENO_DATA_TABLES__ = { + "PublishData": { + "table": "PublishData", "valueCol": "value", "DataIdCol": "Id"}, + "PublishSE": { + "table": "PublishSE", "valueCol": "error", "DataIdCol": "DataId"}, + "NStrain": { + "table": "NStrain", "valueCol": "count", "DataIdCol": "DataId"} +} + + def datasets_by_population( - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int ) -> tuple[dict, ...]: @@ -25,22 +44,22 @@ def datasets_by_population( return tuple(dict(row) for row in cursor.fetchall()) -def dataset_by_id(conn: mdb.Connection, +def dataset_by_id(conn: Connection, species_id: int, population_id: int, dataset_id: int) -> dict: """Fetch dataset details by identifier""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( - "SELECT s.SpeciesId, pf.* FROM Species AS s " - "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId " - "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId " - "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s", + "SELECT Species.SpeciesId, PublishFreeze.* FROM Species " + "INNER JOIN InbredSet ON Species.Id=InbredSet.SpeciesId " + "INNER JOIN PublishFreeze ON InbredSet.Id=PublishFreeze.InbredSetId " + "WHERE Species.Id=%s AND InbredSet.Id=%s AND PublishFreeze.Id=%s", (species_id, population_id, dataset_id)) return dict(cursor.fetchone()) -def phenotypes_count(conn: mdb.Connection, +def phenotypes_count(conn: Connection, population_id: int, dataset_id: int) -> int: """Count the number of phenotypes in the dataset.""" @@ -68,33 +87,39 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]: return dict(res) -def dataset_phenotypes(conn: mdb.Connection, - population_id: int, - dataset_id: int, - offset: int = 0, - limit: Optional[int] = None) -> tuple[dict, ...]: +def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + population_id: int, + dataset_id: int, + offset: int = 0, + limit: Optional[int] = None, + xref_ids: tuple[int, ...] = tuple() +) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" _query = ( - "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode FROM Phenotype AS pheno " + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode " + "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( + f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})" + if len(xref_ids) > 0 else "") + ( f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, (population_id, dataset_id)) - debug_query(cursor, app.logger) + cursor.execute(_query, (population_id, dataset_id) + xref_ids) + debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) -def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): +def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids): """Fetch standard-error values (if they exist) for a phenotype.""" paramstr = ", ".join(["(%s, %s)"] * len(dataids_and_strainids)) flat = tuple(item for sublist in dataids_and_strainids for item in sublist) cursor.execute("SELECT * FROM PublishSE WHERE (DataId, StrainId) IN " f"({paramstr})", flat) - debug_query(cursor, app.logger) + debug_query(cursor, logger) _se = { (row["DataId"], row["StrainId"]): { "DataId": row["DataId"], @@ -107,7 +132,7 @@ def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): cursor.execute("SELECT * FROM NStrain WHERE (DataId, StrainId) IN " f"({paramstr})", flat) - debug_query(cursor, app.logger) + debug_query(cursor, logger) _n = { (row["DataId"], row["StrainId"]): { "DataId": row["DataId"], @@ -137,6 +162,7 @@ def __organise_by_phenotype__(pheno, row): "Pre_publication_abbreviation": row["Pre_publication_abbreviation"], "Post_publication_abbreviation": row["Post_publication_abbreviation"], "xref_id": row["pxr.Id"], + "DataId": row["DataId"], "data": { **(_pheno["data"] if bool(_pheno) else {}), (row["DataId"], row["StrainId"]): { @@ -168,7 +194,7 @@ def __merge_pheno_data_and_se__(data, sedata) -> dict: def phenotype_by_id( - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int, dataset_id: int, @@ -200,13 +226,13 @@ def phenotype_by_id( ).values()) } if bool(_pheno) and len(_pheno.keys()) > 1: - raise Exception( + raise Exception(# pylint: disable=[broad-exception-raised] "We found more than one phenotype with the same identifier!") return None -def phenotypes_data(conn: mdb.Connection, +def phenotypes_data(conn: Connection, population_id: int, dataset_id: int, offset: int = 0, @@ -225,11 +251,64 @@ def phenotypes_data(conn: mdb.Connection, f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) -def save_new_dataset(cursor: Cursor, +def phenotypes_vector_data(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + species_id: int, + population_id: int, + xref_ids: tuple[int, ...] = tuple(), + offset: int = 0, + limit: Optional[int] = None +) -> dict[tuple[int, int, int], dict[str, Union[int,float]]]: + """Retrieve the vector data values for traits in the database.""" + _params: tuple[int, ...] = (species_id, population_id) + _query = ("SELECT " + "Species.Id AS SpeciesId, iset.Id AS InbredSetId, " + "pxr.Id AS xref_id, pdata.*, Strain.Id AS StrainId, " + "Strain.Name AS StrainName " + "FROM " + "Species INNER JOIN InbredSet AS iset " + "ON Species.Id=iset.SpeciesId " + "INNER JOIN PublishXRef AS pxr " + "ON iset.Id=pxr.InbredSetId " + "INNER JOIN PublishData AS pdata " + "ON pxr.DataId=pdata.Id " + "INNER JOIN Strain " + "ON pdata.StrainId=Strain.Id " + "WHERE Species.Id=%s AND iset.Id=%s") + if len(xref_ids) > 0: + _paramstr = ", ".join(["%s"] * len(xref_ids)) + _query = _query + f" AND pxr.Id IN ({_paramstr})" + _params = _params + xref_ids + + def __organise__(acc, row): + _rowid = (species_id, population_id, row["xref_id"]) + _phenodata = { + **acc.get( + _rowid, { + "species_id": species_id, + "population_id": population_id, + "xref_id": row["xref_id"] + }), + row["StrainName"]: row["value"] + } + return { + **acc, + _rowid: _phenodata + } + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + _query + (f" LIMIT {limit} OFFSET {offset}" if bool(limit) else ""), + _params) + debug_query(cursor, logger) + return reduce(__organise__, cursor.fetchall(), {}) + + +def save_new_dataset(cursor: BaseCursor, population_id: int, dataset_name: str, dataset_fullname: str, @@ -252,34 +331,344 @@ def save_new_dataset(cursor: Cursor, "%(created)s, %(public)s, %(population_id)s, %(confidentiality)s, " "%(users)s)", params) - debug_query(cursor, app.logger) + debug_query(cursor, logger) return {**params, "Id": cursor.lastrowid} -def phenotypes_data_by_ids( - conn: mdb.Connection, - inbred_pheno_xref: dict[str, int] +def __pre_process_phenotype_data__(row): + _desc = row.get("description", "") + _pre_pub_desc = row.get("pre_publication_description", _desc) + _orig_desc = row.get("original_description", _desc) + _post_pub_desc = row.get("post_publication_description", _orig_desc) + _pre_pub_abbr = row.get("pre_publication_abbreviation", row["id"]) + _post_pub_abbr = row.get("post_publication_abbreviation", _pre_pub_abbr) + return { + "pre_publication_description": _pre_pub_desc, + "post_publication_description": _post_pub_desc, + "original_description": _orig_desc, + "units": row["units"], + "pre_publication_abbreviation": _pre_pub_abbr, + "post_publication_abbreviation": _post_pub_abbr + } + + +def create_new_phenotypes(# pylint: disable=[too-many-locals] + conn: Connection, + population_id: int, + publication_id: int, + phenotypes: Iterable[dict] ) -> tuple[dict, ...]: - """Fetch all phenotype data, filtered by the `inbred_pheno_xref` mapping.""" - _paramstr = ",".join(["(%s, %s, %s)"] * len(inbred_pheno_xref)) - _query = ("SELECT " - "pub.PubMed_ID, pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " - "FROM Publication AS pub " - "RIGHT JOIN PublishXRef AS pxr0 ON pub.Id=pxr0.PublicationId " - "INNER JOIN Phenotype AS pheno ON pxr0.PhenotypeId=pheno.id " - "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " - "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " - "INNER JOIN Strain AS str ON pd.StrainId=str.Id " - "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " - "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " - "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " - f"WHERE (pxr.InbredSetId, pheno.Id, pxr.Id) IN ({_paramstr}) " - "ORDER BY pheno.Id") + """Add entirely new phenotypes to the database. WARNING: Not thread-safe.""" + _phenos: tuple[dict, ...] = tuple() + with conn.cursor(cursorclass=DictCursor) as cursor: + def make_next_id(idcol, table): + cursor.execute(f"SELECT MAX({idcol}) AS last_id FROM {table}") + _last_id = int(cursor.fetchone()["last_id"]) + def __next_id__(): + _next_id = _last_id + 1 + while True: + yield _next_id + _next_id = _next_id + 1 + + return __next_id__ + + ### Bottleneck: Everything below makes this function not ### + ### thread-safe because we have to retrieve the last IDs from ### + ### the database and increment those to compute the next IDs. ### + ### This is an unfortunate result from the current schema that ### + ### has a cross-reference table that requires that a phenotype ### + ### be linked to an existing publication, and have data IDs to ### + ### link to that phenotype's data. ### + ### The fact that the IDs are sequential also compounds the ### + ### bottleneck. ### + ### + ### For extra safety, ensure the following tables are locked ### + ### for `WRITE`: ### + ### - PublishXRef ### + ### - Phenotype ### + ### - PublishXRef ### + __next_xref_id = make_next_id("Id", "PublishXRef")() + __next_pheno_id__ = make_next_id("Id", "Phenotype")() + __next_data_id__ = make_next_id("DataId", "PublishXRef")() + + def __build_params_and_prepubabbrevs__(acc, row): + processed = __pre_process_phenotype_data__(row) + return ( + acc[0] + ({ + **processed, + "population_id": population_id, + "publication_id": publication_id, + "phenotype_id": next(__next_pheno_id__), + "xref_id": next(__next_xref_id), + "data_id": next(__next_data_id__) + },), + acc[1] + (processed["pre_publication_abbreviation"],)) + while True: + batch = take(phenotypes, 1000) + if len(batch) == 0: + break + + params, abbrevs = reduce(#type: ignore[var-annotated] + __build_params_and_prepubabbrevs__, + batch, + (tuple(), tuple())) + # Check for uniqueness for all "Pre_publication_description" values + abbrevs_paramsstr = ", ".join(["%s"] * len(abbrevs)) + _query = ("SELECT PublishXRef.PhenotypeId, Phenotype.* " + "FROM PublishXRef " + "INNER JOIN Phenotype " + "ON PublishXRef.PhenotypeId=Phenotype.Id " + "WHERE PublishXRef.InbredSetId=%s " + "AND Phenotype.Pre_publication_abbreviation IN " + f"({abbrevs_paramsstr})") + cursor.execute(_query, + ((population_id,) + abbrevs)) + existing = tuple(row["Pre_publication_abbreviation"] + for row in cursor.fetchall()) + if len(existing) > 0: + # Narrow this exception, perhaps? + raise Exception(# pylint: disable=[broad-exception-raised] + "Found already existing phenotypes with the following " + "'Pre-publication abbreviations':\n\t" + "\n\t".join(f"* {item}" for item in existing)) + + cursor.executemany( + ( + "INSERT INTO " + "Phenotype(" + "Id, " + "Pre_publication_description, " + "Post_publication_description, " + "Original_description, " + "Units, " + "Pre_publication_abbreviation, " + "Post_publication_abbreviation, " + "Authorized_Users" + ")" + "VALUES (" + "%(phenotype_id)s, " + "%(pre_publication_description)s, " + "%(post_publication_description)s, " + "%(original_description)s, " + "%(units)s, " + "%(pre_publication_abbreviation)s, " + "%(post_publication_abbreviation)s, " + "'robwilliams'" + ")"), + params) + _comments = f"Created at {datetime.now().isoformat()}" + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, " + "InbredSetId, " + "PhenotypeId, " + "PublicationId, " + "DataId, " + "comments" + ")" + "VALUES(" + "%(xref_id)s, " + "%(population_id)s, " + "%(phenotype_id)s, " + "%(publication_id)s, " + "%(data_id)s, " + f"'{_comments}'" + ")"), + params) + _phenos = _phenos + params + + return _phenos + + +def save_phenotypes_data( + conn: Connection, + table: str, + data: Iterable[dict] +) -> int: + """Save new phenotypes data into the database.""" + _table_details = __PHENO_DATA_TABLES__[table] with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, tuple(item for row in inbred_pheno_xref - for item in (row["population_id"], - row["phenoid"], - row["xref_id"]))) - debug_query(cursor, app.logger) - return tuple( - reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values()) + _count = 0 + while True: + batch = take(data, 100000) + if len(batch) == 0: + logger.warning("Got an empty batch. This needs investigation.") + break + + logger.debug("Saving batch of %s items.", len(batch)) + cursor.executemany( + (f"INSERT INTO {_table_details['table']}" + f"({_table_details['DataIdCol']}, StrainId, {_table_details['valueCol']}) " + "VALUES " + f"(%(data_id)s, %(sample_id)s, %(value)s) "), + tuple(batch)) + debug_query(cursor, logger) + _count = _count + len(batch) + + + logger.debug("Saved a total of %s data rows", _count) + return _count + + +def quick_save_phenotypes_data( + conn: Connection, + table: str, + dataitems: Iterable[dict], + tmpdir: Path +) -> int: + """Save data items to the database, but using """ + _table_details = __PHENO_DATA_TABLES__[table] + with (tempfile.NamedTemporaryFile( + prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile, + conn.cursor(cursorclass=DictCursor) as cursor): + _count = 0 + logger.debug("Write data rows to text file.") + for row in dataitems: + tmpfile.write( + f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n') + _count = _count + 1 + tmpfile.flush() + + logger.debug("Load text file into database (table: %s)", + _table_details["table"]) + cursor.execute( + f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " + f"INTO TABLE {_table_details['table']} " + "(" + f"{_table_details['DataIdCol']}, " + "StrainId, " + f"{_table_details['valueCol']}" + ")") + debug_query(cursor, logger) + return _count + + +def __sleep_random__(): + """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05""" + time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21)))) + + +def delete_phenotypes_data( + cursor: BaseCursor, + data_ids: tuple[int, ...] +) -> tuple[int, int, int]: + """Delete numeric data for phenotypes with the given data IDs.""" + if len(data_ids) == 0: + return (0, 0, 0) + + # Loop to handle big deletes i.e. ≥ 10000 rows + _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted + while True: + _paramstr = ", ".join(["%s"] * len(data_ids)) + cursor.execute( + "DELETE FROM PublishData " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _dcount_curr = cursor.rowcount + _dcount += _dcount_curr + + cursor.execute( + "DELETE FROM PublishSE " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _secount_curr = cursor.rowcount + _secount += _secount_curr + + cursor.execute( + "DELETE FROM NStrain " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _ncount_curr = cursor.rowcount + _ncount += _ncount_curr + __sleep_random__() + + if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)): + # end loop if there are no more rows to delete. + break + + return (_dcount, _secount, _ncount) + + +def __linked_ids__( + cursor: BaseCursor, + population_id: int, + xref_ids: tuple[int, ...] +) -> tuple[tuple[int, int, int], ...]: + """Retrieve `DataId` values from `PublishXRef` table.""" + _paramstr = ", ".join(["%s"] * len(xref_ids)) + cursor.execute("SELECT PhenotypeId, PublicationId, DataId " + "FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr})", + (population_id,) + xref_ids) + return tuple( + (int(row["PhenotypeId"]), int(row["PublicationId"]), int(row["DataId"])) + for row in cursor.fetchall()) + + +def delete_phenotypes( + conn_or_cursor: Union[Connection, Cursor], + population_id: int, + xref_ids: tuple[int, ...] +) -> tuple[int, int, int, int]: + """Delete phenotypes and all their data.""" + def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int: + """Delete data from the `Phenotype` table.""" + _paramstr = ", ".join(["%s"] * len(pheno_ids)) + + _pcount = 0 + while True: + cursor.execute( + "DELETE FROM Phenotype " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 1000", + pheno_ids) + _pcount_curr = cursor.rowcount + _pcount += _pcount_curr + __sleep_random__() + if _pcount_curr == 0: + break + + return cursor.rowcount + + def __delete_xrefs__(cursor: BaseCursor) -> int: + _paramstr = ", ".join(["%s"] * len(xref_ids)) + + _xcount = 0 + while True: + cursor.execute( + "DELETE FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 10000", + (population_id,) + xref_ids) + _xcount_curr = cursor.rowcount + _xcount += _xcount_curr + __sleep_random__() + if _xcount_curr == 0: + break + + return _xcount + + def __with_cursor__(cursor): + _phenoids, _pubids, _dataids = reduce( + lambda acc, curr: (acc[0] + (curr[0],), + acc[1] + (curr[1],), + acc[2] + (curr[2],)), + __linked_ids__(cursor, population_id, xref_ids), + (tuple(), tuple(), tuple())) + __delete_phenos__(cursor, _phenoids) + return (__delete_xrefs__(cursor),) + delete_phenotypes_data( + cursor, _dataids) + + if isinstance(conn_or_cursor, BaseCursor): + return __with_cursor__(conn_or_cursor) + + with conn_or_cursor.cursor(cursorclass=DictCursor) as cursor: + return __with_cursor__(cursor) |
