aboutsummaryrefslogtreecommitdiff
path: root/uploader/phenotypes/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r--uploader/phenotypes/models.py109
1 files changed, 108 insertions, 1 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
index 4ba0d08..eb5a189 100644
--- a/uploader/phenotypes/models.py
+++ b/uploader/phenotypes/models.py
@@ -1,8 +1,9 @@
"""Database and utility functions for phenotypes."""
from typing import Optional
+from functools import reduce
import MySQLdb as mdb
-from MySQLdb.cursors import DictCursor
+from MySQLdb.cursors import Cursor, DictCursor
from uploader.db_utils import debug_query
@@ -70,6 +71,112 @@ def dataset_phenotypes(conn: mdb.Connection,
return tuple(dict(row) for row in cursor.fetchall())
+def __phenotype_se__(cursor: Cursor,
+ species_id: int,
+ population_id: int,
+ dataset_id: int,
+ xref_id: str) -> dict:
+ """Fetch standard-error values (if they exist) for a phenotype."""
+ _sequery = (
+ "SELECT pxr.Id AS xref_id, pxr.DataId, pse.error, nst.count "
+ "FROM Phenotype AS pheno "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishSE AS pse ON pxr.DataId=pse.DataId "
+ "INNER JOIN NStrain AS nst ON pse.DataId=nst.DataId "
+ "INNER JOIN Strain AS str ON nst.StrainId=str.Id "
+ "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId "
+ "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId "
+ "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId "
+ "WHERE (str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)")
+ cursor.execute(_sequery,
+ (species_id, population_id, dataset_id, xref_id))
+ return {row["xref_id"]: dict(row) for row in cursor.fetchall()}
+
+def __organise_by_phenotype__(pheno, row):
+ """Organise disparate data rows into phenotype 'objects'."""
+ _pheno = pheno.get(row["Id"])
+ return {
+ **pheno,
+ row["Id"]: {
+ "Id": row["Id"],
+ "Pre_publication_description": row["Pre_publication_description"],
+ "Post_publication_description": row["Post_publication_description"],
+ "Original_description": row["Original_description"],
+ "Units": row["Units"],
+ "Pre_publication_abbreviation": row["Pre_publication_abbreviation"],
+ "Post_publication_abbreviation": row["Post_publication_abbreviation"],
+ "data": {
+ #TOD0: organise these by DataId and StrainId
+ **(_pheno["data"] if bool(_pheno) else {}),
+ row["pxr.Id"]: {
+ "xref_id": row["pxr.Id"],
+ "DataId": row["DataId"],
+ "mean": row["mean"],
+ "Locus": row["Locus"],
+ "LRS": row["LRS"],
+ "additive": row["additive"],
+ "Sequence": row["Sequence"],
+ "comments": row["comments"],
+ "value": row["value"],
+ "StrainName": row["Name"],
+ "StrainName2": row["Name2"],
+ "StrainSymbol": row["Symbol"],
+ "StrainAlias": row["Alias"]
+ }
+ }
+ }
+ }
+
+
+def __merge_pheno_data_and_se__(data, sedata) -> dict:
+ """Merge phenotype data with the standard errors."""
+ return {
+ key: {**value, **sedata.get(key, {})}
+ for key, value in data.items()
+ }
+
+
+def phenotype_by_id(
+ conn: mdb.Connection,
+ species_id: int,
+ population_id: int,
+ dataset_id: int,
+ xref_id
+) -> Optional[dict]:
+ """Fetch a specific phenotype."""
+ _dataquery = ("SELECT pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode "
+ "FROM Phenotype AS pheno "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id "
+ "INNER JOIN Strain AS str ON pd.StrainId=str.Id "
+ "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId "
+ "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId "
+ "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId "
+ "WHERE "
+ "(str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_dataquery,
+ (species_id, population_id, dataset_id, xref_id))
+ _pheno: dict = reduce(__organise_by_phenotype__, cursor.fetchall(), {})
+ if bool(_pheno) and len(_pheno.keys()) == 1:
+ _pheno = tuple(_pheno.values())[0]
+ return {
+ **_pheno,
+ "data": tuple(__merge_pheno_data_and_se__(
+ _pheno["data"],
+ __phenotype_se__(cursor,
+ species_id,
+ population_id,
+ dataset_id,
+ xref_id)).items())
+ }
+ if bool(_pheno) and len(_pheno.keys()) > 1:
+ raise Exception(
+ "We found more than one phenotype with the same identifier!")
+
+ return None
+
+
def phenotypes_data(conn: mdb.Connection,
population_id: int,
dataset_id: int,