diff options
Diffstat (limited to 'uploader/phenotypes/models.py')
| -rw-r--r-- | uploader/phenotypes/models.py | 133 |
1 files changed, 104 insertions, 29 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 04abcc9..3d656d2 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,4 +1,6 @@ """Database and utility functions for phenotypes.""" +import time +import random import logging import tempfile from pathlib import Path @@ -85,24 +87,43 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]: return dict(res) -def dataset_phenotypes(conn: Connection, - population_id: int, - dataset_id: int, - offset: int = 0, - limit: Optional[int] = None) -> tuple[dict, ...]: +def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + population_id: int, + dataset_id: int, + offset: int = 0, + limit: Optional[int] = None, + xref_ids: tuple[int, ...] = tuple() +) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" - _query = ( - "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode " + _narrow_by_ids = ( + f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})" + if len(xref_ids) > 0 else "") + _narrow_by_limit = ( + f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") + _pub_query = ( + "SELECT pub.* " + "FROM PublishXRef AS pxr " + "INNER JOIN Publication AS pub ON pxr.PublicationId=pub.Id " + "WHERE pxr.InbredSetId=%s") + _narrow_by_ids + _pheno_query = (( + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, pxr.PublicationId, " + "ist.InbredSetCode " "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " - "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( - f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") + "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + + _narrow_by_ids + + _narrow_by_limit) with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, (population_id, dataset_id)) + cursor.execute(_pub_query, (population_id,) + xref_ids) debug_query(cursor, logger) - return tuple(dict(row) for row in cursor.fetchall()) + _pubs = {row["Id"]: dict(row) for row in cursor.fetchall()} + cursor.execute(_pheno_query, (population_id, dataset_id) + xref_ids) + debug_query(cursor, logger) + return tuple({**dict(row), "publication": _pubs[row["PublicationId"]]} + for row in cursor.fetchall()) def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids): @@ -536,6 +557,11 @@ def quick_save_phenotypes_data( return _count +def __sleep_random__(): + """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05""" + time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21)))) + + def delete_phenotypes_data( cursor: BaseCursor, data_ids: tuple[int, ...] @@ -544,17 +570,42 @@ def delete_phenotypes_data( if len(data_ids) == 0: return (0, 0, 0) - _paramstr = ", ".join(["%s"] * len(data_ids)) - cursor.execute(f"DELETE FROM PublishData WHERE Id IN ({_paramstr})", - data_ids) - _dcount = cursor.rowcount - - cursor.execute(f"DELETE FROM PublishSE WHERE DataId IN ({_paramstr})", - data_ids) - _secount = cursor.rowcount - cursor.execute(f"DELETE FROM NStrain WHERE DataId IN ({_paramstr})", - data_ids) - _ncount = cursor.rowcount + # Loop to handle big deletes i.e. ≥ 10000 rows + _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted + while True: + _paramstr = ", ".join(["%s"] * len(data_ids)) + cursor.execute( + "DELETE FROM PublishData " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _dcount_curr = cursor.rowcount + _dcount += _dcount_curr + + cursor.execute( + "DELETE FROM PublishSE " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _secount_curr = cursor.rowcount + _secount += _secount_curr + + cursor.execute( + "DELETE FROM NStrain " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _ncount_curr = cursor.rowcount + _ncount += _ncount_curr + __sleep_random__() + + if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)): + # end loop if there are no more rows to delete. + break + return (_dcount, _secount, _ncount) @@ -583,17 +634,41 @@ def delete_phenotypes( def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int: """Delete data from the `Phenotype` table.""" _paramstr = ", ".join(["%s"] * len(pheno_ids)) - cursor.execute("DELETE FROM Phenotype " - f"WHERE Id IN ({_paramstr})", - pheno_ids) + + _pcount = 0 + while True: + cursor.execute( + "DELETE FROM Phenotype " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 1000", + pheno_ids) + _pcount_curr = cursor.rowcount + _pcount += _pcount_curr + __sleep_random__() + if _pcount_curr == 0: + break + return cursor.rowcount def __delete_xrefs__(cursor: BaseCursor) -> int: _paramstr = ", ".join(["%s"] * len(xref_ids)) - cursor.execute("DELETE FROM PublishXRef " - f"WHERE InbredSetId=%s AND Id IN ({_paramstr})", - (population_id,) + xref_ids) - return cursor.rowcount + + _xcount = 0 + while True: + cursor.execute( + "DELETE FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 10000", + (population_id,) + xref_ids) + _xcount_curr = cursor.rowcount + _xcount += _xcount_curr + __sleep_random__() + if _xcount_curr == 0: + break + + return _xcount def __with_cursor__(cursor): _phenoids, _pubids, _dataids = reduce( |
