aboutsummaryrefslogtreecommitdiff
path: root/uploader/phenotypes/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r--uploader/phenotypes/models.py396
1 files changed, 396 insertions, 0 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
new file mode 100644
index 0000000..c2aeebf
--- /dev/null
+++ b/uploader/phenotypes/models.py
@@ -0,0 +1,396 @@
+"""Database and utility functions for phenotypes."""
+import logging
+import tempfile
+from pathlib import Path
+from functools import reduce
+from datetime import datetime
+from typing import Optional, Iterable
+
+import MySQLdb as mdb
+from MySQLdb.cursors import Cursor, DictCursor
+
+from functional_tools import take
+from gn_libs.mysqldb import debug_query
+
+logger = logging.getLogger(__name__)
+
+
+__PHENO_DATA_TABLES__ = {
+ "PublishData": {
+ "table": "PublishData", "valueCol": "value", "DataIdCol": "Id"},
+ "PublishSE": {
+ "table": "PublishSE", "valueCol": "error", "DataIdCol": "DataId"},
+ "NStrain": {
+ "table": "NStrain", "valueCol": "count", "DataIdCol": "DataId"}
+}
+
+
+def datasets_by_population(
+ conn: mdb.Connection,
+ species_id: int,
+ population_id: int
+) -> tuple[dict, ...]:
+ """Retrieve all of a population's phenotype studies."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT s.SpeciesId, pf.* FROM Species AS s "
+ "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId "
+ "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId "
+ "WHERE s.Id=%s AND iset.Id=%s;",
+ (species_id, population_id))
+ return tuple(dict(row) for row in cursor.fetchall())
+
+
+def dataset_by_id(conn: mdb.Connection,
+ species_id: int,
+ population_id: int,
+ dataset_id: int) -> dict:
+ """Fetch dataset details by identifier"""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT Species.SpeciesId, PublishFreeze.* FROM Species "
+ "INNER JOIN InbredSet ON Species.Id=InbredSet.SpeciesId "
+ "INNER JOIN PublishFreeze ON InbredSet.Id=PublishFreeze.InbredSetId "
+ "WHERE Species.Id=%s AND InbredSet.Id=%s AND PublishFreeze.Id=%s",
+ (species_id, population_id, dataset_id))
+ return dict(cursor.fetchone())
+
+
+def phenotypes_count(conn: mdb.Connection,
+ population_id: int,
+ dataset_id: int) -> int:
+ """Count the number of phenotypes in the dataset."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT COUNT(*) AS total_phenos FROM Phenotype AS pheno "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
+ "WHERE pxr.InbredSetId=%s AND pf.Id=%s",
+ (population_id, dataset_id))
+ return int(cursor.fetchone()["total_phenos"])
+
+
+def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]:
+ """Retrieve the publication data for a phenotype if it exists."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT DISTINCT pxr.PhenotypeId, pub.* FROM PublishXRef AS pxr "
+ "INNER JOIN Publication as pub ON pxr.PublicationId=pub.Id "
+ "WHERE pxr.PhenotypeId=%s",
+ (phenotype_id,))
+ res = cursor.fetchone()
+ if res is None:
+ return res
+ return dict(res)
+
+
+def dataset_phenotypes(conn: mdb.Connection,
+ population_id: int,
+ dataset_id: int,
+ offset: int = 0,
+ limit: Optional[int] = None) -> tuple[dict, ...]:
+ """Fetch the actual phenotypes."""
+ _query = (
+ "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode FROM Phenotype AS pheno "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
+ "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id "
+ "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + (
+ f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_query, (population_id, dataset_id))
+ debug_query(cursor, logger)
+ return tuple(dict(row) for row in cursor.fetchall())
+
+
+def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids):
+ """Fetch standard-error values (if they exist) for a phenotype."""
+ paramstr = ", ".join(["(%s, %s)"] * len(dataids_and_strainids))
+ flat = tuple(item for sublist in dataids_and_strainids for item in sublist)
+ cursor.execute("SELECT * FROM PublishSE WHERE (DataId, StrainId) IN "
+ f"({paramstr})",
+ flat)
+ debug_query(cursor, logger)
+ _se = {
+ (row["DataId"], row["StrainId"]): {
+ "DataId": row["DataId"],
+ "StrainId": row["StrainId"],
+ "error": row["error"]
+ }
+ for row in cursor.fetchall()
+ }
+
+ cursor.execute("SELECT * FROM NStrain WHERE (DataId, StrainId) IN "
+ f"({paramstr})",
+ flat)
+ debug_query(cursor, logger)
+ _n = {
+ (row["DataId"], row["StrainId"]): {
+ "DataId": row["DataId"],
+ "StrainId": row["StrainId"],
+ "count": row["count"]
+ }
+ for row in cursor.fetchall()
+ }
+
+ keys = set(tuple(_se.keys()) + tuple(_n.keys()))
+ return {
+ key: {"xref_id": xref_id, **_se.get(key,{}), **_n.get(key,{})}
+ for key in keys
+ }
+
+def __organise_by_phenotype__(pheno, row):
+ """Organise disparate data rows into phenotype 'objects'."""
+ _pheno = pheno.get(row["Id"])
+ return {
+ **pheno,
+ row["Id"]: {
+ "Id": row["Id"],
+ "Pre_publication_description": row["Pre_publication_description"],
+ "Post_publication_description": row["Post_publication_description"],
+ "Original_description": row["Original_description"],
+ "Units": row["Units"],
+ "Pre_publication_abbreviation": row["Pre_publication_abbreviation"],
+ "Post_publication_abbreviation": row["Post_publication_abbreviation"],
+ "xref_id": row["pxr.Id"],
+ "DataId": row["DataId"],
+ "data": {
+ **(_pheno["data"] if bool(_pheno) else {}),
+ (row["DataId"], row["StrainId"]): {
+ "DataId": row["DataId"],
+ "StrainId": row["StrainId"],
+ "mean": row["mean"],
+ "Locus": row["Locus"],
+ "LRS": row["LRS"],
+ "additive": row["additive"],
+ "Sequence": row["Sequence"],
+ "comments": row["comments"],
+ "value": row["value"],
+ "StrainName": row["Name"],
+ "StrainName2": row["Name2"],
+ "StrainSymbol": row["Symbol"],
+ "StrainAlias": row["Alias"]
+ }
+ }
+ }
+ }
+
+
+def __merge_pheno_data_and_se__(data, sedata) -> dict:
+ """Merge phenotype data with the standard errors."""
+ return {
+ key: {**value, **sedata.get(key, {})}
+ for key, value in data.items()
+ }
+
+
+def phenotype_by_id(
+ conn: mdb.Connection,
+ species_id: int,
+ population_id: int,
+ dataset_id: int,
+ xref_id
+) -> Optional[dict]:
+ """Fetch a specific phenotype."""
+ _dataquery = ("SELECT pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode "
+ "FROM Phenotype AS pheno "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id "
+ "INNER JOIN Strain AS str ON pd.StrainId=str.Id "
+ "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId "
+ "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId "
+ "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId "
+ "WHERE "
+ "(str.SpeciesId, pxr.InbredSetId, pf.Id, pxr.Id)=(%s, %s, %s, %s)")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_dataquery,
+ (species_id, population_id, dataset_id, xref_id))
+ _pheno: dict = reduce(__organise_by_phenotype__, cursor.fetchall(), {})
+ if bool(_pheno) and len(_pheno.keys()) == 1:
+ _pheno = tuple(_pheno.values())[0]
+ return {
+ **_pheno,
+ "data": tuple(__merge_pheno_data_and_se__(
+ _pheno["data"],
+ __phenotype_se__(
+ cursor, xref_id, tuple(_pheno["data"].keys()))
+ ).values())
+ }
+ if bool(_pheno) and len(_pheno.keys()) > 1:
+ raise Exception(
+ "We found more than one phenotype with the same identifier!")
+
+ return None
+
+
+def phenotypes_data(conn: mdb.Connection,
+ population_id: int,
+ dataset_id: int,
+ offset: int = 0,
+ limit: Optional[int] = None) -> tuple[dict, ...]:
+ """Fetch the data for the phenotypes."""
+ # — Phenotype -> PublishXRef -> PublishData -> Strain -> StrainXRef -> PublishFreeze
+ _query = ("SELECT pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode "
+ "FROM Phenotype AS pheno "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id "
+ "INNER JOIN Strain AS str ON pd.StrainId=str.Id "
+ "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId "
+ "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId "
+ "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId "
+ "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + (
+ f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_query, (population_id, dataset_id))
+ debug_query(cursor, logger)
+ return tuple(dict(row) for row in cursor.fetchall())
+
+
+def save_new_dataset(cursor: Cursor,
+ population_id: int,
+ dataset_name: str,
+ dataset_fullname: str,
+ dataset_shortname: str) -> dict:
+ """Create a new phenotype dataset."""
+ params = {
+ "population_id": population_id,
+ "dataset_name": dataset_name,
+ "dataset_fullname": dataset_fullname,
+ "dataset_shortname": dataset_shortname,
+ "created": datetime.now().date().isoformat(),
+ "public": 2,
+ "confidentiality": 0,
+ "users": None
+ }
+ cursor.execute(
+ "INSERT INTO PublishFreeze(Name, FullName, ShortName, CreateTime, "
+ "public, InbredSetId, confidentiality, AuthorisedUsers) "
+ "VALUES(%(dataset_name)s, %(dataset_fullname)s, %(dataset_shortname)s, "
+ "%(created)s, %(public)s, %(population_id)s, %(confidentiality)s, "
+ "%(users)s)",
+ params)
+ debug_query(cursor, logger)
+ return {**params, "Id": cursor.lastrowid}
+
+
+def phenotypes_data_by_ids(
+ conn: mdb.Connection,
+ inbred_pheno_xref: dict[str, int]
+) -> tuple[dict, ...]:
+ """Fetch all phenotype data, filtered by the `inbred_pheno_xref` mapping."""
+ _paramstr = ",".join(["(%s, %s, %s)"] * len(inbred_pheno_xref))
+ _query = ("SELECT "
+ "pub.PubMed_ID, pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode "
+ "FROM Publication AS pub "
+ "RIGHT JOIN PublishXRef AS pxr0 ON pub.Id=pxr0.PublicationId "
+ "INNER JOIN Phenotype AS pheno ON pxr0.PhenotypeId=pheno.id "
+ "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
+ "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id "
+ "INNER JOIN Strain AS str ON pd.StrainId=str.Id "
+ "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId "
+ "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId "
+ "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId "
+ f"WHERE (pxr.InbredSetId, pheno.Id, pxr.Id) IN ({_paramstr}) "
+ "ORDER BY pheno.Id")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(_query, tuple(item for row in inbred_pheno_xref
+ for item in (row["population_id"],
+ row["phenoid"],
+ row["xref_id"])))
+ debug_query(cursor, logger)
+ return tuple(
+ reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values())
+
+
+def create_new_phenotypes(conn: mdb.Connection,
+ phenotypes: Iterable[dict]) -> tuple[dict, ...]:
+ """Add entirely new phenotypes to the database."""
+ _phenos = tuple()
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ while True:
+ batch = take(phenotypes, 1000)
+ if len(batch) == 0:
+ break
+
+ cursor.executemany(
+ ("INSERT INTO "
+ "Phenotype(Pre_publication_description, Original_description, Units, Authorized_Users) "
+ "VALUES (%s, %s, %s, 'robwilliams')"),
+ tuple((row["id"], row["description"], row["units"])
+ for row in batch))
+ paramstr = ", ".join(["%s"] * len(batch))
+ cursor.execute(
+ "SELECT * FROM Phenotype WHERE Pre_publication_description IN "
+ f"({paramstr})",
+ tuple(item["id"] for item in batch))
+ _phenos = _phenos + tuple({
+ "phenotype_id": row["Id"],
+ "id": row["Pre_publication_description"],
+ "description": row["Original_description"],
+ "units": row["Units"]
+ } for row in cursor.fetchall())
+
+ return _phenos
+
+
+def save_phenotypes_data(
+ conn: mdb.Connection,
+ table: str,
+ data: Iterable[dict]
+) -> int:
+ """Save new phenotypes data into the database."""
+ _table_details = __PHENO_DATA_TABLES__[table]
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ _count = 0
+ while True:
+ batch = take(data, 100000)
+ if len(batch) == 0:
+ logger.warning("Got an empty batch. This needs investigation.")
+ break
+
+ logger.debug("Saving batch of %s items.", len(batch))
+ cursor.executemany(
+ (f"INSERT INTO {_table_details['table']}"
+ f"({_table_details['DataIdCol']}, StrainId, {_table_details['valueCol']}) "
+ "VALUES "
+ f"(%(data_id)s, %(sample_id)s, %(value)s) "),
+ tuple(batch))
+ debug_query(cursor, logger)
+ _count = _count + len(batch)
+
+
+ logger.debug("Saved a total of %s data rows", _count)
+ return _count
+
+
+def quick_save_phenotypes_data(
+ conn: mdb.Connection,
+ table: str,
+ dataitems: Iterable[dict],
+ tmpdir: Path
+) -> int:
+ """Save data items to the database, but using """
+ _table_details = __PHENO_DATA_TABLES__[table]
+ with (tempfile.NamedTemporaryFile(
+ prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile,
+ conn.cursor(cursorclass=DictCursor) as cursor):
+ _count = 0
+ logger.debug("Write data rows to text file.")
+ for row in dataitems:
+ tmpfile.write(
+ f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n')
+ _count = _count + 1
+ tmpfile.flush()
+
+ logger.debug("Load text file into database (table: %s)",
+ _table_details["table"])
+ cursor.execute(
+ f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
+ f"INTO TABLE {_table_details['table']} "
+ "("
+ f"{_table_details['DataIdCol']}, "
+ "StrainId, "
+ f"{_table_details['valueCol']}"
+ ")")
+ debug_query(cursor, logger)
+ return _count