about summary refs log tree commit diff
path: root/uploader/phenotypes/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/phenotypes/models.py')
-rw-r--r--uploader/phenotypes/models.py133
1 files changed, 104 insertions, 29 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
index 04abcc9..3d656d2 100644
--- a/uploader/phenotypes/models.py
+++ b/uploader/phenotypes/models.py
@@ -1,4 +1,6 @@
 """Database and utility functions for phenotypes."""
+import time
+import random
 import logging
 import tempfile
 from pathlib import Path
@@ -85,24 +87,43 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]:
         return dict(res)
 
 
-def dataset_phenotypes(conn: Connection,
-                       population_id: int,
-                       dataset_id: int,
-                       offset: int = 0,
-                       limit: Optional[int] = None) -> tuple[dict, ...]:
+def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
+        conn: Connection,
+        population_id: int,
+        dataset_id: int,
+        offset: int = 0,
+        limit: Optional[int] = None,
+        xref_ids: tuple[int, ...] = tuple()
+) -> tuple[dict, ...]:
     """Fetch the actual phenotypes."""
-    _query = (
-        "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode "
+    _narrow_by_ids = (
+            f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})"
+            if len(xref_ids) > 0 else "")
+    _narrow_by_limit = (
+        f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
+    _pub_query = (
+        "SELECT pub.* "
+        "FROM PublishXRef AS pxr "
+        "INNER JOIN  Publication AS pub ON pxr.PublicationId=pub.Id "
+        "WHERE pxr.InbredSetId=%s") + _narrow_by_ids
+    _pheno_query = ((
+        "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, pxr.PublicationId, "
+        "ist.InbredSetCode "
         "FROM Phenotype AS pheno "
         "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
         "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
         "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id "
-        "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + (
-            f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
+        "WHERE pxr.InbredSetId=%s AND pf.Id=%s") +
+                    _narrow_by_ids +
+                    _narrow_by_limit)
     with conn.cursor(cursorclass=DictCursor) as cursor:
-        cursor.execute(_query, (population_id, dataset_id))
+        cursor.execute(_pub_query, (population_id,) + xref_ids)
         debug_query(cursor, logger)
-        return tuple(dict(row) for row in cursor.fetchall())
+        _pubs = {row["Id"]: dict(row) for row in cursor.fetchall()}
+        cursor.execute(_pheno_query, (population_id, dataset_id) + xref_ids)
+        debug_query(cursor, logger)
+        return tuple({**dict(row), "publication": _pubs[row["PublicationId"]]}
+                     for row in cursor.fetchall())
 
 
 def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids):
@@ -536,6 +557,11 @@ def quick_save_phenotypes_data(
         return _count
 
 
+def __sleep_random__():
+    """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05"""
+    time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21))))
+
+
 def delete_phenotypes_data(
         cursor: BaseCursor,
         data_ids: tuple[int, ...]
@@ -544,17 +570,42 @@ def delete_phenotypes_data(
     if len(data_ids) == 0:
         return (0, 0, 0)
 
-    _paramstr = ", ".join(["%s"] * len(data_ids))
-    cursor.execute(f"DELETE FROM PublishData WHERE Id IN ({_paramstr})",
-                   data_ids)
-    _dcount = cursor.rowcount
-
-    cursor.execute(f"DELETE FROM PublishSE WHERE DataId IN ({_paramstr})",
-                   data_ids)
-    _secount = cursor.rowcount
-    cursor.execute(f"DELETE FROM NStrain WHERE DataId IN ({_paramstr})",
-                   data_ids)
-    _ncount = cursor.rowcount
+    # Loop to handle big deletes i.e. ≥ 10000 rows
+    _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted
+    while True:
+        _paramstr = ", ".join(["%s"] * len(data_ids))
+        cursor.execute(
+            "DELETE FROM PublishData "
+            f"WHERE Id IN ({_paramstr}) "
+            "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _dcount_curr = cursor.rowcount
+        _dcount += _dcount_curr
+
+        cursor.execute(
+            "DELETE FROM PublishSE "
+            f"WHERE DataId IN ({_paramstr}) "
+            "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _secount_curr = cursor.rowcount
+        _secount += _secount_curr
+
+        cursor.execute(
+            "DELETE FROM NStrain "
+            f"WHERE DataId IN ({_paramstr}) "
+            "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _ncount_curr = cursor.rowcount
+        _ncount += _ncount_curr
+        __sleep_random__()
+
+        if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)):
+            # end loop if there are no more rows to delete.
+            break
+
     return (_dcount, _secount, _ncount)
 
 
@@ -583,17 +634,41 @@ def delete_phenotypes(
     def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int:
         """Delete data from the `Phenotype` table."""
         _paramstr = ", ".join(["%s"] * len(pheno_ids))
-        cursor.execute("DELETE FROM Phenotype "
-                       f"WHERE Id IN ({_paramstr})",
-                       pheno_ids)
+
+        _pcount = 0
+        while True:
+            cursor.execute(
+                "DELETE FROM Phenotype "
+                f"WHERE Id IN ({_paramstr}) "
+                "ORDER BY Id "
+                "LIMIT 1000",
+                pheno_ids)
+            _pcount_curr = cursor.rowcount
+            _pcount += _pcount_curr
+            __sleep_random__()
+            if _pcount_curr == 0:
+                break
+
         return cursor.rowcount
 
     def __delete_xrefs__(cursor: BaseCursor) -> int:
         _paramstr = ", ".join(["%s"] * len(xref_ids))
-        cursor.execute("DELETE FROM PublishXRef "
-                       f"WHERE InbredSetId=%s AND Id IN ({_paramstr})",
-                       (population_id,) + xref_ids)
-        return cursor.rowcount
+
+        _xcount = 0
+        while True:
+            cursor.execute(
+                "DELETE FROM PublishXRef "
+                f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) "
+                "ORDER BY Id "
+                "LIMIT 10000",
+                (population_id,) + xref_ids)
+            _xcount_curr = cursor.rowcount
+            _xcount += _xcount_curr
+            __sleep_random__()
+            if _xcount_curr == 0:
+                break
+
+        return _xcount
 
     def __with_cursor__(cursor):
         _phenoids, _pubids, _dataids = reduce(