diff options
author | Frederick Muriuki Muriithi | 2024-09-30 16:36:53 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2024-09-30 16:36:53 -0500 |
commit | 1696242aa80f489a8ed4e5a01a30a1fd813dd4f3 (patch) | |
tree | b4549590c2320dc70dee96a9a03c179f4683e8f9 /uploader/phenotypes/models.py | |
parent | 35419644dd9093630649093a4e2fba06c19ffb0c (diff) | |
download | gn-uploader-1696242aa80f489a8ed4e5a01a30a1fd813dd4f3.tar.gz |
Initialise views for a specific phenotype
Each phenotype is independent, of all others, and they are only put
into datasets mostly for easy coralling of phenotypes related to a
specific populations. As such, the system will probably need to
provide a way to view (and possibly edit) each phenotype independent
of all the others.
This also fits in with the auth.
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r-- | uploader/phenotypes/models.py | 109 |
1 files changed, 108 insertions, 1 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 4ba0d08..eb5a189 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,8 +1,9 @@ """Database and utility functions for phenotypes.""" from typing import Optional +from functools import reduce import MySQLdb as mdb -from MySQLdb.cursors import DictCursor +from MySQLdb.cursors import Cursor, DictCursor from uploader.db_utils import debug_query @@ -70,6 +71,112 @@ def dataset_phenotypes(conn: mdb.Connection, return tuple(dict(row) for row in cursor.fetchall()) +def __phenotype_se__(cursor: Cursor, + species_id: int, + population_id: int, + dataset_id: int, + xref_id: str) -> dict: + """Fetch standard-error values (if they exist) for a phenotype.""" + _sequery = ( + "SELECT pxr.Id AS xref_id, pxr.DataId, pse.error, nst.count " + "FROM Phenotype AS pheno " + "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " + "INNER JOIN PublishSE AS pse ON pxr.DataId=pse.DataId " + "INNER JOIN NStrain AS nst ON pse.DataId=nst.DataId " + "INNER JOIN Strain AS str ON nst.StrainId=str.Id " + "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " + "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " + "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " + "WHERE (str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)") + cursor.execute(_sequery, + (species_id, population_id, dataset_id, xref_id)) + return {row["xref_id"]: dict(row) for row in cursor.fetchall()} + +def __organise_by_phenotype__(pheno, row): + """Organise disparate data rows into phenotype 'objects'.""" + _pheno = pheno.get(row["Id"]) + return { + **pheno, + row["Id"]: { + "Id": row["Id"], + "Pre_publication_description": row["Pre_publication_description"], + "Post_publication_description": row["Post_publication_description"], + "Original_description": row["Original_description"], + "Units": row["Units"], + "Pre_publication_abbreviation": row["Pre_publication_abbreviation"], + "Post_publication_abbreviation": row["Post_publication_abbreviation"], + "data": { + #TOD0: organise these by DataId and StrainId + **(_pheno["data"] if bool(_pheno) else {}), + row["pxr.Id"]: { + "xref_id": row["pxr.Id"], + "DataId": row["DataId"], + "mean": row["mean"], + "Locus": row["Locus"], + "LRS": row["LRS"], + "additive": row["additive"], + "Sequence": row["Sequence"], + "comments": row["comments"], + "value": row["value"], + "StrainName": row["Name"], + "StrainName2": row["Name2"], + "StrainSymbol": row["Symbol"], + "StrainAlias": row["Alias"] + } + } + } + } + + +def __merge_pheno_data_and_se__(data, sedata) -> dict: + """Merge phenotype data with the standard errors.""" + return { + key: {**value, **sedata.get(key, {})} + for key, value in data.items() + } + + +def phenotype_by_id( + conn: mdb.Connection, + species_id: int, + population_id: int, + dataset_id: int, + xref_id +) -> Optional[dict]: + """Fetch a specific phenotype.""" + _dataquery = ("SELECT pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " + "FROM Phenotype AS pheno " + "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " + "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " + "INNER JOIN Strain AS str ON pd.StrainId=str.Id " + "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " + "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " + "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " + "WHERE " + "(str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)") + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_dataquery, + (species_id, population_id, dataset_id, xref_id)) + _pheno: dict = reduce(__organise_by_phenotype__, cursor.fetchall(), {}) + if bool(_pheno) and len(_pheno.keys()) == 1: + _pheno = tuple(_pheno.values())[0] + return { + **_pheno, + "data": tuple(__merge_pheno_data_and_se__( + _pheno["data"], + __phenotype_se__(cursor, + species_id, + population_id, + dataset_id, + xref_id)).items()) + } + if bool(_pheno) and len(_pheno.keys()) > 1: + raise Exception( + "We found more than one phenotype with the same identifier!") + + return None + + def phenotypes_data(conn: mdb.Connection, population_id: int, dataset_id: int, |