"""Load pheno from R/qtl2 bundle into the database.""" import sys import argparse import traceback from zipfile import ZipFile from functools import reduce from logging import Logger, getLogger from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor from scripts.rqtl2.entry import build_main from scripts.rqtl2.cli_parser import add_common_arguments from scripts.cli_parser import init_cli_parser, add_global_data_arguments from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from functional_tools import take __MODULE__ = "scripts.rqtl2.install_phenos" def insert_probesets(dbconn: mdb.Connection, platformid: int, phenos: tuple[str, ...]) -> int: """Insert any new phenos into the database.""" with dbconn.cursor() as cursor: cursor.executemany( "INSERT INTO ProbeSet(ChipId, Name) VALUES(%(chipid)s, %(pheno)s) " "ON DUPLICATE KEY UPDATE Name=Name", tuple({"pheno": pheno, "chipid": platformid} for pheno in phenos)) return cursor.rowcount def insert_probeset_data(dbconn: mdb.Connection, speciesid: int, phenos: tuple[dict, ...]) -> tuple[ int, tuple[dict, ...]]: """Insert numeric (averages) data into the database.""" with dbconn.cursor(cursorclass=DictCursor) as cursor: _other, samplenames = reduce(#type: ignore[var-annotated] lambda acc, row: (#type: ignore[arg-type, return-value] (acc[0] + (row,)), (acc[1] + (row["id"],))), phenos, (tuple(), tuple())) paramstr = ", ".join(["%s"] * len(samplenames)) cursor.execute( "SELECT Id, Name FROM Strain WHERE SpeciesId=%s " f"AND Name IN ({paramstr})", (speciesid,) + samplenames) sampleids = {row["Name"]: row["Id"] for row in cursor.fetchall()} cursor.execute("SELECT MAX(Id) AS lastid FROM ProbeSetData") lastid = (cursor.fetchone() or {}).get("lastid", 0) data = tuple( {**row, "psdid": lastid + idx} for idx, row in enumerate( (item for item in ({ "sampleid": sampleids[innerrow["id"]], "value": innerrow[pheno], "pheno": pheno } for innerrow in phenos for pheno in (key for key in innerrow.keys() if key != "id")) if item["value"] is not None), start=1)) cursor.executemany( "INSERT INTO ProbeSetData(Id, StrainId, value) " "VALUES(%(psdid)s, %(sampleid)s, %(value)s)", data) return cursor.rowcount, tuple({ "dataid": row["psdid"], "pheno": row["pheno"] } for row in data) def cross_reference_probeset_data(dbconn: mdb.Connection, platformid: int, datasetid: int, phenos: tuple[str, ...], dataids: tuple[dict, ...]) -> int: """ Cross-reference the data with the dataset and related ProbeSet phenotype. """ with dbconn.cursor(cursorclass=DictCursor) as cursor: paramstr = ", ".join(["%s"] * len(phenos)) cursor.execute("SELECT Id, Name FROM ProbeSet WHERE ChipId=%s " f"AND Name IN ({paramstr})", (platformid,) + phenos) phenoids = {row["Name"]: row["Id"] for row in cursor.fetchall()} cursor.executemany( "INSERT INTO ProbeSetXRef(ProbeSetFreezeId, ProbeSetId, DataId) " "VALUES(%(datasetid)s, %(phenoid)s, %(dataid)s)", tuple({ **row, "datasetid": datasetid, "phenoid": phenoids.get(row["pheno"]) } for row in dataids)) return cursor.rowcount def install_pheno_files(#pylint: disable=[too-many-locals] rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, fullyqualifiedjobid: str,#pylint: disable=[unused-argument] args: argparse.Namespace, logger: Logger = getLogger()) -> int: """Load data in `pheno` files and other related files into the database.""" (speciesid, platformid, datasetid, rqtl2bundle) = ( args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle) with ZipFile(str(rqtl2bundle), "r") as zfile: try: rqc.validate_bundle(zfile) logger.info("R/qtl2 bundle validated successfully.") cdata = rqtl2.control_data(zfile) phenodata = rqtl2.file_data(zfile, "pheno", cdata) phenorows = 0 datarows = 0 while True: batch = tuple(take(phenodata, 1000)) if len(batch) == 0: logger.info("pheno data loading complete!") logger.info("Inserted a total of %s new ProbeSets and %s " "new data rows.", phenorows, datarows) break phenos = tuple(key for key in batch[0].keys() if key != "id") count = insert_probesets(dbconn, platformid, phenos) phenorows += count logger.info("Inserted %s new ProbeSets", count) count, dataids = insert_probeset_data(dbconn, speciesid, batch) datarows += count logger.info("Inserted %s new data rows", count) cross_reference_probeset_data( dbconn, platformid, datasetid, phenos, dataids) except rqtl2.InvalidFormat as _exc: logger.error(traceback.format_exc()) logger.debug("There is no pheno file in the R/qtl2 bundle.") except Exception as _exc:# pylint: disable=[broad-except] logger.error("Failing with the exception: %s", traceback.format_exc()) return 3 return 0 if __name__ == "__main__": def cli_args(): """Process command-line arguments for `install_phenos`""" parser = add_global_data_arguments(init_cli_parser( "install_genotypes", "Parse genotypes from R/qtl2 bundle into the database.")) parser.add_argument( "platformid", help="The platform from which the data was generated.") parser = add_common_arguments(parser) return parser.parse_args() main = build_main(cli_args(), install_pheno_files, __MODULE__) sys.exit(main())