about summary refs log tree commit diff
path: root/uploader/phenotypes/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r--uploader/phenotypes/models.py217
1 files changed, 193 insertions, 24 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
index 20b8e77..af06376 100644
--- a/uploader/phenotypes/models.py
+++ b/uploader/phenotypes/models.py
@@ -4,14 +4,15 @@ import tempfile
 from pathlib import Path
 from functools import reduce
 from datetime import datetime
-from typing import Optional, Iterable
+from typing import Union, Optional, Iterable
 
 import MySQLdb as mdb
 from MySQLdb.cursors import Cursor, DictCursor
 
-from functional_tools import take
 from gn_libs.mysqldb import debug_query
 
+from functional_tools import take
+
 logger = logging.getLogger(__name__)
 
 
@@ -91,7 +92,8 @@ def dataset_phenotypes(conn: mdb.Connection,
                        limit: Optional[int] = None) -> tuple[dict, ...]:
     """Fetch the actual phenotypes."""
     _query = (
-        "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode FROM Phenotype AS pheno "
+        "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode "
+        "FROM Phenotype AS pheno "
         "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
         "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
         "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id "
@@ -217,7 +219,7 @@ def phenotype_by_id(
                 ).values())
             }
         if bool(_pheno) and len(_pheno.keys()) > 1:
-            raise Exception(
+            raise Exception(# pylint: disable=[broad-exception-raised]
                 "We found more than one phenotype with the same identifier!")
 
     return None
@@ -246,6 +248,59 @@ def phenotypes_data(conn: mdb.Connection,
         return tuple(dict(row) for row in cursor.fetchall())
 
 
+def phenotypes_vector_data(
+        conn: mdb.Connection,
+        species_id: int,
+        population_id: int,
+        xref_ids: tuple[int, ...] = tuple(),
+        offset: int = 0,
+        limit: Optional[int] = None
+) -> dict[tuple[int, int, int]: dict[str, Union[int,float]]]:
+    """Retrieve the vector data values for traits in the database."""
+    _params = (species_id, population_id)
+    _query = ("SELECT "
+              "Species.Id AS SpeciesId, iset.Id AS InbredSetId, "
+              "pxr.Id AS xref_id, pdata.*, Strain.Id AS StrainId, "
+              "Strain.Name AS StrainName "
+              "FROM "
+              "Species INNER JOIN InbredSet AS iset "
+              "ON Species.Id=iset.SpeciesId "
+              "INNER JOIN PublishXRef AS pxr "
+              "ON iset.Id=pxr.InbredSetId "
+              "INNER JOIN PublishData AS pdata "
+              "ON pxr.DataId=pdata.Id "
+              "INNER JOIN Strain "
+              "ON pdata.StrainId=Strain.Id "
+              "WHERE Species.Id=%s AND iset.Id=%s")
+    if len(xref_ids) > 0:
+        _paramstr = ", ".join(["%s"] * len(xref_ids))
+        _query = _query + f" AND pxr.Id IN ({_paramstr})"
+        _params = _params + xref_ids
+
+    def __organise__(acc, row):
+        _rowid = (species_id, population_id, row["xref_id"])
+        _phenodata = {
+            **acc.get(
+                _rowid, {
+                    "species_id": species_id,
+                    "population_id": population_id,
+                    "xref_id": row["xref_id"]
+                }),
+            row["StrainName"]: row["value"]
+        }
+        return {
+            **acc,
+            _rowid: _phenodata
+        }
+
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(
+            _query + (f" LIMIT {limit} OFFSET {offset}" if bool(limit) else ""),
+            _params)
+        debug_query(cursor, logger)
+        return reduce(__organise__, cursor.fetchall(), {})
+
+
 def save_new_dataset(cursor: Cursor,
                      population_id: int,
                      dataset_name: str,
@@ -302,32 +357,146 @@ def phenotypes_data_by_ids(
             reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values())
 
 
-def create_new_phenotypes(conn: mdb.Connection,
-                          phenotypes: Iterable[dict]) -> tuple[dict, ...]:
-    """Add entirely new phenotypes to the database."""
+def __pre_process_phenotype_data__(row):
+    _desc = row.get("description", "")
+    _pre_pub_desc = row.get("pre_publication_description", _desc)
+    _orig_desc = row.get("original_description", _desc)
+    _post_pub_desc = row.get("post_publication_description", _orig_desc)
+    _pre_pub_abbr = row.get("pre_publication_abbreviation", row["id"])
+    _post_pub_abbr = row.get("post_publication_abbreviation", _pre_pub_abbr)
+    return {
+        "pre_publication_description": _pre_pub_desc,
+        "post_publication_description": _post_pub_desc,
+        "original_description": _orig_desc,
+        "units": row["units"],
+        "pre_publication_abbreviation": _pre_pub_abbr,
+        "post_publication_abbreviation": _post_pub_abbr
+    }
+
+
+def create_new_phenotypes(# pylint: disable=[too-many-locals]
+        conn: mdb.Connection,
+        population_id: int,
+        publication_id: int,
+        phenotypes: Iterable[dict]
+) -> tuple[dict, ...]:
+    """Add entirely new phenotypes to the database. WARNING: Not thread-safe."""
     _phenos = tuple()
     with conn.cursor(cursorclass=DictCursor) as cursor:
+        def make_next_id(idcol, table):
+            cursor.execute(f"SELECT MAX({idcol}) AS last_id FROM {table}")
+            _last_id = int(cursor.fetchone()["last_id"])
+            def __next_id__():
+                _next_id = _last_id + 1
+                while True:
+                    yield _next_id
+                    _next_id = _next_id + 1
+
+            return __next_id__
+
+        ### Bottleneck: Everything below makes this function not         ###
+        ###   thread-safe because we have to retrieve the last IDs from  ###
+        ###   the database and increment those to compute the next IDs.  ###
+        ###   This is an unfortunate result from the current schema that ###
+        ###   has a cross-reference table that requires that a phenotype ###
+        ###   be linked to an existing publication, and have data IDs to ###
+        ###   link to that phenotype's data.                             ###
+        ###   The fact that the IDs are sequential also compounds the    ###
+        ###   bottleneck.                                                ###
+        ###
+        ###   For extra safety, ensure the following tables are locked   ###
+        ###   for `WRITE`:                                               ###
+        ###   - PublishXRef                                              ###
+        ###   - Phenotype                                                ###
+        ###   - PublishXRef                                              ###
+        __next_xref_id = make_next_id("Id", "PublishXRef")()
+        __next_pheno_id__ = make_next_id("Id", "Phenotype")()
+        __next_data_id__ = make_next_id("DataId", "PublishXRef")()
+
+        def __build_params_and_prepubabbrevs__(acc, row):
+            processed = __pre_process_phenotype_data__(row)
+            return (
+                acc[0] + ({
+                    **processed,
+                    "population_id": population_id,
+                    "publication_id": publication_id,
+                    "phenotype_id": next(__next_pheno_id__),
+                    "xref_id": next(__next_xref_id),
+                    "data_id": next(__next_data_id__)
+                },),
+                acc[1] + (processed["pre_publication_abbreviation"],))
         while True:
             batch = take(phenotypes, 1000)
             if len(batch) == 0:
                 break
 
+            params, abbrevs = reduce(__build_params_and_prepubabbrevs__,
+                                     batch,
+                                     (tuple(), tuple()))
+            # Check for uniqueness for all "Pre_publication_description" values
+            abbrevs_paramsstr = ", ".join(["%s"] * len(abbrevs))
+            _query = ("SELECT PublishXRef.PhenotypeId, Phenotype.* "
+                      "FROM PublishXRef "
+                      "INNER JOIN Phenotype "
+                      "ON PublishXRef.PhenotypeId=Phenotype.Id "
+                      "WHERE PublishXRef.InbredSetId=%s "
+                      "AND Phenotype.Pre_publication_abbreviation IN "
+                      f"({abbrevs_paramsstr})")
+            cursor.execute(_query,
+                           ((population_id,) + abbrevs))
+            existing = tuple(row["Pre_publication_abbreviation"]
+                             for row in cursor.fetchall())
+            if len(existing) > 0:
+                # Narrow this exception, perhaps?
+                raise Exception(# pylint: disable=[broad-exception-raised]
+                    "Found already existing phenotypes with the following "
+                    "'Pre-publication abbreviations':\n\t"
+                    "\n\t".join(f"* {item}" for item in existing))
+
             cursor.executemany(
-                ("INSERT INTO "
-                 "Phenotype(Pre_publication_description, Original_description, Units, Authorized_Users) "
-                 "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams')"),
-                tuple(batch))
-            paramstr = ", ".join(["%s"] * len(batch))
-            cursor.execute(
-                "SELECT * FROM Phenotype WHERE Pre_publication_description IN "
-                f"({paramstr})",
-                tuple(item["id"] for item in batch))
-            _phenos = _phenos + tuple({
-                "phenotype_id": row["Id"],
-                "id": row["Pre_publication_description"],
-                "description": row["Original_description"],
-                "units": row["Units"]
-            } for row in cursor.fetchall())
+                (
+                    "INSERT INTO "
+                    "Phenotype("
+                    "Id, "
+                    "Pre_publication_description, "
+                    "Post_publication_description, "
+                    "Original_description, "
+                    "Units, "
+                    "Pre_publication_abbreviation, "
+                    "Post_publication_abbreviation, "
+                    "Authorized_Users"
+                    ")"
+                    "VALUES ("
+                    "%(phenotype_id)s, "
+                    "%(pre_publication_description)s, "
+                    "%(post_publication_description)s, "
+                    "%(original_description)s, "
+                    "%(units)s, "
+                    "%(pre_publication_abbreviation)s, "
+                    "%(post_publication_abbreviation)s, "
+                    "'robwilliams'"
+                    ")"),
+                params)
+            _comments = f"Created at {datetime.now().isoformat()}"
+            cursor.executemany(
+                ("INSERT INTO PublishXRef("
+                 "Id, "
+                 "InbredSetId, "
+                 "PhenotypeId, "
+                 "PublicationId, "
+                 "DataId, "
+                 "comments"
+                 ")"
+                 "VALUES("
+                 "%(xref_id)s, "
+                 "%(population_id)s, "
+                 "%(phenotype_id)s, "
+                 "%(publication_id)s, "
+                 "%(data_id)s, "
+                 f"'{_comments}'"
+                 ")"),
+                params)
+            _phenos = _phenos + params
 
     return _phenos
 
@@ -374,14 +543,14 @@ def quick_save_phenotypes_data(
             prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile,
           conn.cursor(cursorclass=DictCursor) as cursor):
         _count = 0
-        console.debug("Write data rows to text file.")
+        logger.debug("Write data rows to text file.")
         for row in dataitems:
             tmpfile.write(
                 f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n')
             _count = _count + 1
         tmpfile.flush()
 
-        console.debug("Load text file into database (table: %s)",
+        logger.debug("Load text file into database (table: %s)",
                       _table_details["table"])
         cursor.execute(
             f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "