diff options
Diffstat (limited to 'uploader/phenotypes/models.py')
| -rw-r--r-- | uploader/phenotypes/models.py | 176 |
1 files changed, 157 insertions, 19 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index a22497c..3946a0f 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,4 +1,6 @@ """Database and utility functions for phenotypes.""" +import time +import random import logging import tempfile from pathlib import Path @@ -6,8 +8,8 @@ from functools import reduce from datetime import datetime from typing import Union, Optional, Iterable -import MySQLdb as mdb -from MySQLdb.cursors import Cursor, DictCursor +from MySQLdb.connections import Connection +from MySQLdb.cursors import Cursor, DictCursor, BaseCursor from gn_libs.mysqldb import debug_query @@ -27,7 +29,7 @@ __PHENO_DATA_TABLES__ = { def datasets_by_population( - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int ) -> tuple[dict, ...]: @@ -42,7 +44,7 @@ def datasets_by_population( return tuple(dict(row) for row in cursor.fetchall()) -def dataset_by_id(conn: mdb.Connection, +def dataset_by_id(conn: Connection, species_id: int, population_id: int, dataset_id: int) -> dict: @@ -57,7 +59,7 @@ def dataset_by_id(conn: mdb.Connection, return dict(cursor.fetchone()) -def phenotypes_count(conn: mdb.Connection, +def phenotypes_count(conn: Connection, population_id: int, dataset_id: int) -> int: """Count the number of phenotypes in the dataset.""" @@ -85,11 +87,14 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]: return dict(res) -def dataset_phenotypes(conn: mdb.Connection, - population_id: int, - dataset_id: int, - offset: int = 0, - limit: Optional[int] = None) -> tuple[dict, ...]: +def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + population_id: int, + dataset_id: int, + offset: int = 0, + limit: Optional[int] = None, + xref_ids: tuple[int, ...] = tuple() +) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" _query = ( "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode " @@ -98,14 +103,16 @@ def dataset_phenotypes(conn: mdb.Connection, "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( + f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})" + if len(xref_ids) > 0 else "") + ( f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, (population_id, dataset_id)) + cursor.execute(_query, (population_id, dataset_id) + xref_ids) debug_query(cursor, logger) return tuple(dict(row) for row in cursor.fetchall()) -def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): +def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids): """Fetch standard-error values (if they exist) for a phenotype.""" paramstr = ", ".join(["(%s, %s)"] * len(dataids_and_strainids)) flat = tuple(item for sublist in dataids_and_strainids for item in sublist) @@ -187,7 +194,7 @@ def __merge_pheno_data_and_se__(data, sedata) -> dict: def phenotype_by_id( - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int, dataset_id: int, @@ -225,7 +232,7 @@ def phenotype_by_id( return None -def phenotypes_data(conn: mdb.Connection, +def phenotypes_data(conn: Connection, population_id: int, dataset_id: int, offset: int = 0, @@ -249,7 +256,7 @@ def phenotypes_data(conn: mdb.Connection, def phenotypes_vector_data(# pylint: disable=[too-many-arguments, too-many-positional-arguments] - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int, xref_ids: tuple[int, ...] = tuple(), @@ -301,7 +308,7 @@ def phenotypes_vector_data(# pylint: disable=[too-many-arguments, too-many-posit return reduce(__organise__, cursor.fetchall(), {}) -def save_new_dataset(cursor: Cursor, +def save_new_dataset(cursor: BaseCursor, population_id: int, dataset_name: str, dataset_fullname: str, @@ -346,7 +353,7 @@ def __pre_process_phenotype_data__(row): def create_new_phenotypes(# pylint: disable=[too-many-locals] - conn: mdb.Connection, + conn: Connection, population_id: int, publication_id: int, phenotypes: Iterable[dict] @@ -474,7 +481,7 @@ def create_new_phenotypes(# pylint: disable=[too-many-locals] def save_phenotypes_data( - conn: mdb.Connection, + conn: Connection, table: str, data: Iterable[dict] ) -> int: @@ -504,7 +511,7 @@ def save_phenotypes_data( def quick_save_phenotypes_data( - conn: mdb.Connection, + conn: Connection, table: str, dataitems: Iterable[dict], tmpdir: Path @@ -534,3 +541,134 @@ def quick_save_phenotypes_data( ")") debug_query(cursor, logger) return _count + + +def __sleep_random__(): + """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05""" + time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21)))) + + +def delete_phenotypes_data( + cursor: BaseCursor, + data_ids: tuple[int, ...] +) -> tuple[int, int, int]: + """Delete numeric data for phenotypes with the given data IDs.""" + if len(data_ids) == 0: + return (0, 0, 0) + + # Loop to handle big deletes i.e. ≥ 10000 rows + _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted + while True: + _paramstr = ", ".join(["%s"] * len(data_ids)) + cursor.execute( + "DELETE FROM PublishData " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _dcount_curr = cursor.rowcount + _dcount += _dcount_curr + + cursor.execute( + "DELETE FROM PublishSE " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _secount_curr = cursor.rowcount + _secount += _secount_curr + + cursor.execute( + "DELETE FROM NStrain " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _ncount_curr = cursor.rowcount + _ncount += _ncount_curr + __sleep_random__() + + if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)): + # end loop if there are no more rows to delete. + break + + return (_dcount, _secount, _ncount) + + +def __linked_ids__( + cursor: BaseCursor, + population_id: int, + xref_ids: tuple[int, ...] +) -> tuple[tuple[int, int, int], ...]: + """Retrieve `DataId` values from `PublishXRef` table.""" + _paramstr = ", ".join(["%s"] * len(xref_ids)) + cursor.execute("SELECT PhenotypeId, PublicationId, DataId " + "FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr})", + (population_id,) + xref_ids) + return tuple( + (int(row["PhenotypeId"]), int(row["PublicationId"]), int(row["DataId"])) + for row in cursor.fetchall()) + + +def delete_phenotypes( + conn_or_cursor: Union[Connection, Cursor], + population_id: int, + xref_ids: tuple[int, ...] +) -> tuple[int, int, int, int]: + """Delete phenotypes and all their data.""" + def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int: + """Delete data from the `Phenotype` table.""" + _paramstr = ", ".join(["%s"] * len(pheno_ids)) + + _pcount = 0 + while True: + cursor.execute( + "DELETE FROM Phenotype " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 1000", + pheno_ids) + _pcount_curr = cursor.rowcount + _pcount += _pcount_curr + __sleep_random__() + if _pcount_curr == 0: + break + + return cursor.rowcount + + def __delete_xrefs__(cursor: BaseCursor) -> int: + _paramstr = ", ".join(["%s"] * len(xref_ids)) + + _xcount = 0 + while True: + cursor.execute( + "DELETE FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 10000", + (population_id,) + xref_ids) + _xcount_curr = cursor.rowcount + _xcount += _xcount_curr + __sleep_random__() + if _xcount_curr == 0: + break + + return _xcount + + def __with_cursor__(cursor): + _phenoids, _pubids, _dataids = reduce( + lambda acc, curr: (acc[0] + (curr[0],), + acc[1] + (curr[1],), + acc[2] + (curr[2],)), + __linked_ids__(cursor, population_id, xref_ids), + (tuple(), tuple(), tuple())) + __delete_phenos__(cursor, _phenoids) + return (__delete_xrefs__(cursor),) + delete_phenotypes_data( + cursor, _dataids) + + if isinstance(conn_or_cursor, BaseCursor): + return __with_cursor__(conn_or_cursor) + + with conn_or_cursor.cursor(cursorclass=DictCursor) as cursor: + return __with_cursor__(cursor) |
