diff options
Diffstat (limited to 'uploader/phenotypes/models.py')
| -rw-r--r-- | uploader/phenotypes/models.py | 217 |
1 files changed, 193 insertions, 24 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 20b8e77..af06376 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -4,14 +4,15 @@ import tempfile from pathlib import Path from functools import reduce from datetime import datetime -from typing import Optional, Iterable +from typing import Union, Optional, Iterable import MySQLdb as mdb from MySQLdb.cursors import Cursor, DictCursor -from functional_tools import take from gn_libs.mysqldb import debug_query +from functional_tools import take + logger = logging.getLogger(__name__) @@ -91,7 +92,8 @@ def dataset_phenotypes(conn: mdb.Connection, limit: Optional[int] = None) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" _query = ( - "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode FROM Phenotype AS pheno " + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode " + "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " @@ -217,7 +219,7 @@ def phenotype_by_id( ).values()) } if bool(_pheno) and len(_pheno.keys()) > 1: - raise Exception( + raise Exception(# pylint: disable=[broad-exception-raised] "We found more than one phenotype with the same identifier!") return None @@ -246,6 +248,59 @@ def phenotypes_data(conn: mdb.Connection, return tuple(dict(row) for row in cursor.fetchall()) +def phenotypes_vector_data( + conn: mdb.Connection, + species_id: int, + population_id: int, + xref_ids: tuple[int, ...] = tuple(), + offset: int = 0, + limit: Optional[int] = None +) -> dict[tuple[int, int, int]: dict[str, Union[int,float]]]: + """Retrieve the vector data values for traits in the database.""" + _params = (species_id, population_id) + _query = ("SELECT " + "Species.Id AS SpeciesId, iset.Id AS InbredSetId, " + "pxr.Id AS xref_id, pdata.*, Strain.Id AS StrainId, " + "Strain.Name AS StrainName " + "FROM " + "Species INNER JOIN InbredSet AS iset " + "ON Species.Id=iset.SpeciesId " + "INNER JOIN PublishXRef AS pxr " + "ON iset.Id=pxr.InbredSetId " + "INNER JOIN PublishData AS pdata " + "ON pxr.DataId=pdata.Id " + "INNER JOIN Strain " + "ON pdata.StrainId=Strain.Id " + "WHERE Species.Id=%s AND iset.Id=%s") + if len(xref_ids) > 0: + _paramstr = ", ".join(["%s"] * len(xref_ids)) + _query = _query + f" AND pxr.Id IN ({_paramstr})" + _params = _params + xref_ids + + def __organise__(acc, row): + _rowid = (species_id, population_id, row["xref_id"]) + _phenodata = { + **acc.get( + _rowid, { + "species_id": species_id, + "population_id": population_id, + "xref_id": row["xref_id"] + }), + row["StrainName"]: row["value"] + } + return { + **acc, + _rowid: _phenodata + } + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + _query + (f" LIMIT {limit} OFFSET {offset}" if bool(limit) else ""), + _params) + debug_query(cursor, logger) + return reduce(__organise__, cursor.fetchall(), {}) + + def save_new_dataset(cursor: Cursor, population_id: int, dataset_name: str, @@ -302,32 +357,146 @@ def phenotypes_data_by_ids( reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values()) -def create_new_phenotypes(conn: mdb.Connection, - phenotypes: Iterable[dict]) -> tuple[dict, ...]: - """Add entirely new phenotypes to the database.""" +def __pre_process_phenotype_data__(row): + _desc = row.get("description", "") + _pre_pub_desc = row.get("pre_publication_description", _desc) + _orig_desc = row.get("original_description", _desc) + _post_pub_desc = row.get("post_publication_description", _orig_desc) + _pre_pub_abbr = row.get("pre_publication_abbreviation", row["id"]) + _post_pub_abbr = row.get("post_publication_abbreviation", _pre_pub_abbr) + return { + "pre_publication_description": _pre_pub_desc, + "post_publication_description": _post_pub_desc, + "original_description": _orig_desc, + "units": row["units"], + "pre_publication_abbreviation": _pre_pub_abbr, + "post_publication_abbreviation": _post_pub_abbr + } + + +def create_new_phenotypes(# pylint: disable=[too-many-locals] + conn: mdb.Connection, + population_id: int, + publication_id: int, + phenotypes: Iterable[dict] +) -> tuple[dict, ...]: + """Add entirely new phenotypes to the database. WARNING: Not thread-safe.""" _phenos = tuple() with conn.cursor(cursorclass=DictCursor) as cursor: + def make_next_id(idcol, table): + cursor.execute(f"SELECT MAX({idcol}) AS last_id FROM {table}") + _last_id = int(cursor.fetchone()["last_id"]) + def __next_id__(): + _next_id = _last_id + 1 + while True: + yield _next_id + _next_id = _next_id + 1 + + return __next_id__ + + ### Bottleneck: Everything below makes this function not ### + ### thread-safe because we have to retrieve the last IDs from ### + ### the database and increment those to compute the next IDs. ### + ### This is an unfortunate result from the current schema that ### + ### has a cross-reference table that requires that a phenotype ### + ### be linked to an existing publication, and have data IDs to ### + ### link to that phenotype's data. ### + ### The fact that the IDs are sequential also compounds the ### + ### bottleneck. ### + ### + ### For extra safety, ensure the following tables are locked ### + ### for `WRITE`: ### + ### - PublishXRef ### + ### - Phenotype ### + ### - PublishXRef ### + __next_xref_id = make_next_id("Id", "PublishXRef")() + __next_pheno_id__ = make_next_id("Id", "Phenotype")() + __next_data_id__ = make_next_id("DataId", "PublishXRef")() + + def __build_params_and_prepubabbrevs__(acc, row): + processed = __pre_process_phenotype_data__(row) + return ( + acc[0] + ({ + **processed, + "population_id": population_id, + "publication_id": publication_id, + "phenotype_id": next(__next_pheno_id__), + "xref_id": next(__next_xref_id), + "data_id": next(__next_data_id__) + },), + acc[1] + (processed["pre_publication_abbreviation"],)) while True: batch = take(phenotypes, 1000) if len(batch) == 0: break + params, abbrevs = reduce(__build_params_and_prepubabbrevs__, + batch, + (tuple(), tuple())) + # Check for uniqueness for all "Pre_publication_description" values + abbrevs_paramsstr = ", ".join(["%s"] * len(abbrevs)) + _query = ("SELECT PublishXRef.PhenotypeId, Phenotype.* " + "FROM PublishXRef " + "INNER JOIN Phenotype " + "ON PublishXRef.PhenotypeId=Phenotype.Id " + "WHERE PublishXRef.InbredSetId=%s " + "AND Phenotype.Pre_publication_abbreviation IN " + f"({abbrevs_paramsstr})") + cursor.execute(_query, + ((population_id,) + abbrevs)) + existing = tuple(row["Pre_publication_abbreviation"] + for row in cursor.fetchall()) + if len(existing) > 0: + # Narrow this exception, perhaps? + raise Exception(# pylint: disable=[broad-exception-raised] + "Found already existing phenotypes with the following " + "'Pre-publication abbreviations':\n\t" + "\n\t".join(f"* {item}" for item in existing)) + cursor.executemany( - ("INSERT INTO " - "Phenotype(Pre_publication_description, Original_description, Units, Authorized_Users) " - "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams')"), - tuple(batch)) - paramstr = ", ".join(["%s"] * len(batch)) - cursor.execute( - "SELECT * FROM Phenotype WHERE Pre_publication_description IN " - f"({paramstr})", - tuple(item["id"] for item in batch)) - _phenos = _phenos + tuple({ - "phenotype_id": row["Id"], - "id": row["Pre_publication_description"], - "description": row["Original_description"], - "units": row["Units"] - } for row in cursor.fetchall()) + ( + "INSERT INTO " + "Phenotype(" + "Id, " + "Pre_publication_description, " + "Post_publication_description, " + "Original_description, " + "Units, " + "Pre_publication_abbreviation, " + "Post_publication_abbreviation, " + "Authorized_Users" + ")" + "VALUES (" + "%(phenotype_id)s, " + "%(pre_publication_description)s, " + "%(post_publication_description)s, " + "%(original_description)s, " + "%(units)s, " + "%(pre_publication_abbreviation)s, " + "%(post_publication_abbreviation)s, " + "'robwilliams'" + ")"), + params) + _comments = f"Created at {datetime.now().isoformat()}" + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, " + "InbredSetId, " + "PhenotypeId, " + "PublicationId, " + "DataId, " + "comments" + ")" + "VALUES(" + "%(xref_id)s, " + "%(population_id)s, " + "%(phenotype_id)s, " + "%(publication_id)s, " + "%(data_id)s, " + f"'{_comments}'" + ")"), + params) + _phenos = _phenos + params return _phenos @@ -374,14 +543,14 @@ def quick_save_phenotypes_data( prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile, conn.cursor(cursorclass=DictCursor) as cursor): _count = 0 - console.debug("Write data rows to text file.") + logger.debug("Write data rows to text file.") for row in dataitems: tmpfile.write( f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n') _count = _count + 1 tmpfile.flush() - console.debug("Load text file into database (table: %s)", + logger.debug("Load text file into database (table: %s)", _table_details["table"]) cursor.execute( f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " |
