"""Database and utility functions for phenotypes.""" from typing import Optional from functools import reduce from datetime import datetime import MySQLdb as mdb from MySQLdb.cursors import Cursor, DictCursor from uploader.db_utils import debug_query def datasets_by_population( conn: mdb.Connection, species_id: int, population_id: int ) -> tuple[dict, ...]: """Retrieve all of a population's phenotype studies.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT s.SpeciesId, pf.* FROM Species AS s " "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId " "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId " "WHERE s.Id=%s AND iset.Id=%s;", (species_id, population_id)) return tuple(dict(row) for row in cursor.fetchall()) def dataset_by_id(conn: mdb.Connection, species_id: int, population_id: int, dataset_id: int) -> dict: """Fetch dataset details by identifier""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT s.SpeciesId, pf.* FROM Species AS s " "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId " "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId " "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s", (species_id, population_id, dataset_id)) return dict(cursor.fetchone()) def phenotypes_count(conn: mdb.Connection, population_id: int, dataset_id: int) -> int: """Count the number of phenotypes in the dataset.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT COUNT(*) AS total_phenos FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "WHERE pxr.InbredSetId=%s AND pf.Id=%s", (population_id, dataset_id)) return int(cursor.fetchone()["total_phenos"]) def dataset_phenotypes(conn: mdb.Connection, population_id: int, dataset_id: int, offset: int = 0, limit: Optional[int] = None) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" _query = ( "SELECT pheno.*, pxr.Id, ist.InbredSetCode FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) debug_query(cursor) return tuple(dict(row) for row in cursor.fetchall()) def __phenotype_se__(cursor: Cursor, species_id: int, population_id: int, dataset_id: int, xref_id: str) -> dict: """Fetch standard-error values (if they exist) for a phenotype.""" _sequery = ( "SELECT pxr.Id AS xref_id, pxr.DataId, str.Id AS StrainId, pse.error, nst.count " "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishSE AS pse ON pxr.DataId=pse.DataId " "INNER JOIN NStrain AS nst ON pse.DataId=nst.DataId " "INNER JOIN Strain AS str ON nst.StrainId=str.Id " "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " "WHERE (str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)") cursor.execute(_sequery, (species_id, population_id, dataset_id, xref_id)) return {(row["DataId"], row["StrainId"]): { "xref_id": row["xref_id"], "DataId": row["DataId"], "error": row["error"], "count": row["count"] } for row in cursor.fetchall()} def __organise_by_phenotype__(pheno, row): """Organise disparate data rows into phenotype 'objects'.""" _pheno = pheno.get(row["Id"]) return { **pheno, row["Id"]: { "Id": row["Id"], "Pre_publication_description": row["Pre_publication_description"], "Post_publication_description": row["Post_publication_description"], "Original_description": row["Original_description"], "Units": row["Units"], "Pre_publication_abbreviation": row["Pre_publication_abbreviation"], "Post_publication_abbreviation": row["Post_publication_abbreviation"], "xref_id": row["pxr.Id"], "data": { **(_pheno["data"] if bool(_pheno) else {}), (row["DataId"], row["StrainId"]): { "DataId": row["DataId"], "mean": row["mean"], "Locus": row["Locus"], "LRS": row["LRS"], "additive": row["additive"], "Sequence": row["Sequence"], "comments": row["comments"], "value": row["value"], "StrainName": row["Name"], "StrainName2": row["Name2"], "StrainSymbol": row["Symbol"], "StrainAlias": row["Alias"] } } } } def __merge_pheno_data_and_se__(data, sedata) -> dict: """Merge phenotype data with the standard errors.""" return { key: {**value, **sedata.get(key, {})} for key, value in data.items() } def phenotype_by_id( conn: mdb.Connection, species_id: int, population_id: int, dataset_id: int, xref_id ) -> Optional[dict]: """Fetch a specific phenotype.""" _dataquery = ("SELECT pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " "INNER JOIN Strain AS str ON pd.StrainId=str.Id " "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " "WHERE " "(str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_dataquery, (species_id, population_id, dataset_id, xref_id)) _pheno: dict = reduce(__organise_by_phenotype__, cursor.fetchall(), {}) if bool(_pheno) and len(_pheno.keys()) == 1: _pheno = tuple(_pheno.values())[0] return { **_pheno, "data": tuple(__merge_pheno_data_and_se__( _pheno["data"], __phenotype_se__(cursor, species_id, population_id, dataset_id, xref_id)).values()) } if bool(_pheno) and len(_pheno.keys()) > 1: raise Exception( "We found more than one phenotype with the same identifier!") return None def phenotypes_data(conn: mdb.Connection, population_id: int, dataset_id: int, offset: int = 0, limit: Optional[int] = None) -> tuple[dict, ...]: """Fetch the data for the phenotypes.""" # — Phenotype -> PublishXRef -> PublishData -> Strain -> StrainXRef -> PublishFreeze _query = ("SELECT pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " "INNER JOIN Strain AS str ON pd.StrainId=str.Id " "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (population_id, dataset_id)) debug_query(cursor) return tuple(dict(row) for row in cursor.fetchall()) def save_new_dataset(cursor: Cursor, population_id: int, dataset_name: str, dataset_fullname: str, dataset_shortname: str) -> dict: """Create a new phenotype dataset.""" params = { "population_id": population_id, "dataset_name": dataset_name, "dataset_fullname": dataset_fullname, "dataset_shortname": dataset_shortname, "created": datetime.now().date().isoformat(), "public": 2, "confidentiality": 0, "users": None } cursor.execute( "INSERT INTO PublishFreeze(Name, FullName, ShortName, CreateTime, " "public, InbredSetId, confidentiality, AuthorisedUsers) " "VALUES(%(dataset_name)s, %(dataset_fullname)s, %(dataset_shortname)s, " "%(created)s, %(public)s, %(population_id)s, %(confidentiality)s, " "%(users)s)", params) debug_query(cursor) return {**params, "Id": cursor.lastrowid}