about summary refs log tree commit diff
path: root/uploader/phenotypes/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r--uploader/phenotypes/models.py176
1 files changed, 157 insertions, 19 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
index a22497c..3946a0f 100644
--- a/uploader/phenotypes/models.py
+++ b/uploader/phenotypes/models.py
@@ -1,4 +1,6 @@
 """Database and utility functions for phenotypes."""
+import time
+import random
 import logging
 import tempfile
 from pathlib import Path
@@ -6,8 +8,8 @@ from functools import reduce
 from datetime import datetime
 from typing import Union, Optional, Iterable
 
-import MySQLdb as mdb
-from MySQLdb.cursors import Cursor, DictCursor
+from MySQLdb.connections import Connection
+from MySQLdb.cursors import Cursor, DictCursor, BaseCursor
 
 from gn_libs.mysqldb import debug_query
 
@@ -27,7 +29,7 @@ __PHENO_DATA_TABLES__ = {
 
 
 def datasets_by_population(
-        conn: mdb.Connection,
+        conn: Connection,
         species_id: int,
         population_id: int
 ) -> tuple[dict, ...]:
@@ -42,7 +44,7 @@ def datasets_by_population(
         return tuple(dict(row) for row in cursor.fetchall())
 
 
-def dataset_by_id(conn: mdb.Connection,
+def dataset_by_id(conn: Connection,
                   species_id: int,
                   population_id: int,
                   dataset_id: int) -> dict:
@@ -57,7 +59,7 @@ def dataset_by_id(conn: mdb.Connection,
         return dict(cursor.fetchone())
 
 
-def phenotypes_count(conn: mdb.Connection,
+def phenotypes_count(conn: Connection,
                      population_id: int,
                      dataset_id: int) -> int:
     """Count the number of phenotypes in the dataset."""
@@ -85,11 +87,14 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]:
         return dict(res)
 
 
-def dataset_phenotypes(conn: mdb.Connection,
-                       population_id: int,
-                       dataset_id: int,
-                       offset: int = 0,
-                       limit: Optional[int] = None) -> tuple[dict, ...]:
+def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
+        conn: Connection,
+        population_id: int,
+        dataset_id: int,
+        offset: int = 0,
+        limit: Optional[int] = None,
+        xref_ids: tuple[int, ...] = tuple()
+) -> tuple[dict, ...]:
     """Fetch the actual phenotypes."""
     _query = (
         "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode "
@@ -98,14 +103,16 @@ def dataset_phenotypes(conn: mdb.Connection,
         "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
         "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id "
         "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + (
+            f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})"
+            if len(xref_ids) > 0 else "") + (
             f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
     with conn.cursor(cursorclass=DictCursor) as cursor:
-        cursor.execute(_query, (population_id, dataset_id))
+        cursor.execute(_query, (population_id, dataset_id) + xref_ids)
         debug_query(cursor, logger)
         return tuple(dict(row) for row in cursor.fetchall())
 
 
-def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids):
+def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids):
     """Fetch standard-error values (if they exist) for a phenotype."""
     paramstr = ", ".join(["(%s, %s)"] * len(dataids_and_strainids))
     flat = tuple(item for sublist in dataids_and_strainids for item in sublist)
@@ -187,7 +194,7 @@ def __merge_pheno_data_and_se__(data, sedata) -> dict:
 
 
 def phenotype_by_id(
-        conn: mdb.Connection,
+        conn: Connection,
         species_id: int,
         population_id: int,
         dataset_id: int,
@@ -225,7 +232,7 @@ def phenotype_by_id(
     return None
 
 
-def phenotypes_data(conn: mdb.Connection,
+def phenotypes_data(conn: Connection,
                     population_id: int,
                     dataset_id: int,
                     offset: int = 0,
@@ -249,7 +256,7 @@ def phenotypes_data(conn: mdb.Connection,
 
 
 def phenotypes_vector_data(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
-        conn: mdb.Connection,
+        conn: Connection,
         species_id: int,
         population_id: int,
         xref_ids: tuple[int, ...] = tuple(),
@@ -301,7 +308,7 @@ def phenotypes_vector_data(# pylint: disable=[too-many-arguments, too-many-posit
         return reduce(__organise__, cursor.fetchall(), {})
 
 
-def save_new_dataset(cursor: Cursor,
+def save_new_dataset(cursor: BaseCursor,
                      population_id: int,
                      dataset_name: str,
                      dataset_fullname: str,
@@ -346,7 +353,7 @@ def __pre_process_phenotype_data__(row):
 
 
 def create_new_phenotypes(# pylint: disable=[too-many-locals]
-        conn: mdb.Connection,
+        conn: Connection,
         population_id: int,
         publication_id: int,
         phenotypes: Iterable[dict]
@@ -474,7 +481,7 @@ def create_new_phenotypes(# pylint: disable=[too-many-locals]
 
 
 def save_phenotypes_data(
-        conn: mdb.Connection,
+        conn: Connection,
         table: str,
         data: Iterable[dict]
 ) -> int:
@@ -504,7 +511,7 @@ def save_phenotypes_data(
 
 
 def quick_save_phenotypes_data(
-        conn: mdb.Connection,
+        conn: Connection,
         table: str,
         dataitems: Iterable[dict],
         tmpdir: Path
@@ -534,3 +541,134 @@ def quick_save_phenotypes_data(
             ")")
         debug_query(cursor, logger)
         return _count
+
+
+def __sleep_random__():
+    """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05"""
+    time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21))))
+
+
+def delete_phenotypes_data(
+        cursor: BaseCursor,
+        data_ids: tuple[int, ...]
+) -> tuple[int, int, int]:
+    """Delete numeric data for phenotypes with the given data IDs."""
+    if len(data_ids) == 0:
+        return (0, 0, 0)
+
+    # Loop to handle big deletes i.e. ≥ 10000 rows
+    _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted
+    while True:
+        _paramstr = ", ".join(["%s"] * len(data_ids))
+        cursor.execute(
+            "DELETE FROM PublishData "
+            f"WHERE Id IN ({_paramstr}) "
+            "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _dcount_curr = cursor.rowcount
+        _dcount += _dcount_curr
+
+        cursor.execute(
+            "DELETE FROM PublishSE "
+            f"WHERE DataId IN ({_paramstr}) "
+            "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _secount_curr = cursor.rowcount
+        _secount += _secount_curr
+
+        cursor.execute(
+            "DELETE FROM NStrain "
+            f"WHERE DataId IN ({_paramstr}) "
+            "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _ncount_curr = cursor.rowcount
+        _ncount += _ncount_curr
+        __sleep_random__()
+
+        if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)):
+            # end loop if there are no more rows to delete.
+            break
+
+    return (_dcount, _secount, _ncount)
+
+
+def __linked_ids__(
+        cursor: BaseCursor,
+        population_id: int,
+        xref_ids: tuple[int, ...]
+) -> tuple[tuple[int, int, int], ...]:
+    """Retrieve `DataId` values from `PublishXRef` table."""
+    _paramstr = ", ".join(["%s"] * len(xref_ids))
+    cursor.execute("SELECT PhenotypeId, PublicationId, DataId "
+                   "FROM PublishXRef "
+                   f"WHERE InbredSetId=%s AND Id IN ({_paramstr})",
+                   (population_id,) + xref_ids)
+    return tuple(
+        (int(row["PhenotypeId"]), int(row["PublicationId"]), int(row["DataId"]))
+        for row in cursor.fetchall())
+
+
+def delete_phenotypes(
+        conn_or_cursor: Union[Connection, Cursor],
+        population_id: int,
+        xref_ids: tuple[int, ...]
+) -> tuple[int, int, int, int]:
+    """Delete phenotypes and all their data."""
+    def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int:
+        """Delete data from the `Phenotype` table."""
+        _paramstr = ", ".join(["%s"] * len(pheno_ids))
+
+        _pcount = 0
+        while True:
+            cursor.execute(
+                "DELETE FROM Phenotype "
+                f"WHERE Id IN ({_paramstr}) "
+                "ORDER BY Id "
+                "LIMIT 1000",
+                pheno_ids)
+            _pcount_curr = cursor.rowcount
+            _pcount += _pcount_curr
+            __sleep_random__()
+            if _pcount_curr == 0:
+                break
+
+        return cursor.rowcount
+
+    def __delete_xrefs__(cursor: BaseCursor) -> int:
+        _paramstr = ", ".join(["%s"] * len(xref_ids))
+
+        _xcount = 0
+        while True:
+            cursor.execute(
+                "DELETE FROM PublishXRef "
+                f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) "
+                "ORDER BY Id "
+                "LIMIT 10000",
+                (population_id,) + xref_ids)
+            _xcount_curr = cursor.rowcount
+            _xcount += _xcount_curr
+            __sleep_random__()
+            if _xcount_curr == 0:
+                break
+
+        return _xcount
+
+    def __with_cursor__(cursor):
+        _phenoids, _pubids, _dataids = reduce(
+            lambda acc, curr: (acc[0] + (curr[0],),
+                               acc[1] + (curr[1],),
+                               acc[2] + (curr[2],)),
+            __linked_ids__(cursor, population_id, xref_ids),
+            (tuple(), tuple(), tuple()))
+        __delete_phenos__(cursor, _phenoids)
+        return (__delete_xrefs__(cursor),) + delete_phenotypes_data(
+            cursor, _dataids)
+
+    if isinstance(conn_or_cursor, BaseCursor):
+        return __with_cursor__(conn_or_cursor)
+
+    with conn_or_cursor.cursor(cursorclass=DictCursor) as cursor:
+        return __with_cursor__(cursor)