From ef6da7313f96390b9fecb126f9b7e9beb1afe034 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 15 Jan 2024 12:43:08 +0300 Subject: Process the `pheno` files. Process the data in the pheno files and insert it into the database. --- scripts/rqtl2/install_phenos.py | 139 +++++++++++++++++++++++++++++++++++----- 1 file changed, 124 insertions(+), 15 deletions(-) (limited to 'scripts/rqtl2') diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index b532dad..b3d445b 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -1,34 +1,147 @@ """Load pheno from R/qtl2 bundle into the database.""" import sys import logging -# import traceback +import traceback from pathlib import Path +from zipfile import ZipFile +from functools import reduce import MySQLdb as mdb +from MySQLdb.cursors import DictCursor from scripts.rqtl2.entry import build_main from scripts.cli_parser import init_cli_parser from scripts.rqtl2.cli_parser import add_common_arguments +from r_qtl import r_qtl2 as rqtl2 + +from quality_control.parsing import take stderr_handler = logging.StreamHandler(stream=sys.stderr) logger = logging.getLogger("install_phenos") logger.addHandler(stderr_handler) -def install_pheno_files(_dbconn: mdb.Connection, - _speciesid: int, - _populationid: int, - _platformid: int, - _studyid: int, - _datasetid: int, - _rqtl2bundle: Path) -> int: +def insert_probesets(dbconn: mdb.Connection, + platformid: int, + phenos: tuple[str, ...]) -> int: + """Insert any new phenos into the database.""" + with dbconn.cursor() as cursor: + cursor.executemany( + "INSERT INTO ProbeSet(ChipId, Name) VALUES(%(chipid)s, %(pheno)s) " + "ON DUPLICATE KEY UPDATE Name=Name", + tuple({"pheno": pheno, "chipid": platformid} for pheno in phenos)) + return cursor.rowcount + +def insert_probeset_data(dbconn: mdb.Connection, + speciesid: int, + phenos: tuple[dict, ...]) -> tuple[ + int, tuple[dict, ...]]: + """Insert numeric (averages) data into the database.""" + with dbconn.cursor(cursorclass=DictCursor) as cursor: + _other, samplenames = reduce(#type: ignore[var-annotated] + lambda acc, row: (#type: ignore[arg-type, return-value] + (acc[0] + (row,)), + (acc[1] + (row["id"],))), + phenos, + (tuple(), tuple())) + paramstr = ", ".join(["%s"] * len(samplenames)) + cursor.execute( + "SELECT Id, Name FROM Strain WHERE SpeciesId=%s " + f"AND Name IN ({paramstr})", + (speciesid,) + samplenames) + sampleids = {row["Name"]: row["Id"] for row in cursor.fetchall()} + cursor.execute("SELECT MAX(Id) AS lastid FROM ProbeSetData") + lastid = (cursor.fetchone() or {}).get("lastid", 0) + data = tuple( + {**row, "psdid": lastid + idx} + for idx, row in enumerate( + ({ + "sampleid": sampleids[innerrow["id"]], + "value": innerrow[pheno], + "pheno": pheno + } for innerrow in phenos for pheno in + (key for key in innerrow.keys() if key != "id")), + start=1)) + cursor.executemany( + "INSERT INTO ProbeSetData(Id, StrainId, value) " + "VALUES(%(psdid)s, %(sampleid)s, %(value)s)", + data) + return cursor.rowcount, tuple({ + "dataid": row["psdid"], + "pheno": row["pheno"] + } for row in data) + +def cross_reference_probeset_data(dbconn: mdb.Connection, + platformid: int, + datasetid: int, + phenos: tuple[str, ...], + dataids: tuple[dict, ...]) -> int: + """ + Cross-reference the data with the dataset and related ProbeSet phenotype. + """ + with dbconn.cursor(cursorclass=DictCursor) as cursor: + paramstr = ", ".join(["%s"] * len(phenos)) + cursor.execute("SELECT Id, Name FROM ProbeSet WHERE ChipId=%s " + f"AND Name IN ({paramstr})", + (platformid,) + phenos) + phenoids = {row["Name"]: row["Id"] for row in cursor.fetchall()} + cursor.executemany( + "INSERT INTO ProbeSetXRef(ProbeSetFreezeId, ProbeSetId, DataId) " + "VALUES(%(datasetid)s, %(phenoid)s, %(dataid)s)", + tuple({ + **row, + "datasetid": datasetid, + "phenoid": phenoids.get(row["pheno"]) + } for row in dataids)) + return cursor.rowcount + +def install_pheno_files(dbconn: mdb.Connection, + speciesid: int, + platformid: int, + datasetid: int, + rqtl2bundle: Path) -> int: """Load data in `pheno` files and other related files into the database.""" - logger.debug("WE ARE HERE!!!") - return 5 + with ZipFile(str(rqtl2bundle), "r") as zfile: + try: + rqtl2.validate_bundle(zfile) + logger.info("R/qtl2 bundle validated successfully.") + + cdata = rqtl2.control_data(zfile) + phenodata = rqtl2.file_data(zfile, "pheno", cdata) + phenorows = 0 + datarows = 0 + while True: + batch = tuple(take(phenodata, 5000)) + if len(batch) == 0: + logger.info("pheno data loading complete!") + logger.info("Inserted a total of %s new ProbeSets and %s " + "new data rows.", + phenorows, + datarows) + break + + phenos = tuple(key for key in batch[0].keys() if key != "id") + count = insert_probesets(dbconn, platformid, phenos) + phenorows += count + logger.info("Inserted %s new ProbeSets", count) + + count, dataids = insert_probeset_data(dbconn, speciesid, batch) + datarows += count + logger.info("Inserted %s new data rows", count) + + cross_reference_probeset_data( + dbconn, platformid, datasetid, phenos, dataids) + except rqtl2.InvalidFormat as _exc: + logger.error(traceback.format_exc()) + logger.debug("There is no pheno file in the R/qtl2 bundle.") + except Exception as _exc:# pylint: disable=[broad-except] + logger.error("Failing with the exception: %s", traceback.format_exc()) + return 3 + return 0 if __name__ == "__main__": def cli_args(): - """Process command-line arguments for install_genotypes""" + """Process command-line arguments for `install_phenos`""" parser = init_cli_parser( "install_genotypes", "Parse genotypes from R/qtl2 bundle into the database.") @@ -36,8 +149,6 @@ if __name__ == "__main__": parser.add_argument( "platformid", help="The platform from which the data was generated.") - parser.add_argument("studyid", - help="The study to which the data belongs.") parser = add_common_arguments(parser) @@ -47,9 +158,7 @@ if __name__ == "__main__": cli_args, lambda dbconn, args: install_pheno_files(dbconn, args.speciesid, - args.populationid, args.platformid, - args.studyid, args.datasetid, args.rqtl2bundle), logger, -- cgit v1.2.3