about summary refs log tree commit diff
path: root/scripts/rqtl2/install_phenos.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rqtl2/install_phenos.py')
-rw-r--r--scripts/rqtl2/install_phenos.py139
1 files changed, 124 insertions, 15 deletions
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index b532dad..b3d445b 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -1,34 +1,147 @@
 """Load pheno from R/qtl2 bundle into the database."""
 import sys
 import logging
-# import traceback
+import traceback
 from pathlib import Path
+from zipfile import ZipFile
+from functools import reduce
 
 import MySQLdb as mdb
+from MySQLdb.cursors import DictCursor
 
 from scripts.rqtl2.entry import build_main
 from scripts.cli_parser import init_cli_parser
 from scripts.rqtl2.cli_parser import add_common_arguments
 
+from r_qtl import r_qtl2 as rqtl2
+
+from quality_control.parsing import take
 stderr_handler = logging.StreamHandler(stream=sys.stderr)
 logger = logging.getLogger("install_phenos")
 logger.addHandler(stderr_handler)
 
-def install_pheno_files(_dbconn: mdb.Connection,
-                        _speciesid: int,
-                        _populationid: int,
-                        _platformid: int,
-                        _studyid: int,
-                        _datasetid: int,
-                        _rqtl2bundle: Path) -> int:
+def insert_probesets(dbconn: mdb.Connection,
+                     platformid: int,
+                     phenos: tuple[str, ...]) -> int:
+    """Insert any new phenos into the database."""
+    with dbconn.cursor() as cursor:
+        cursor.executemany(
+            "INSERT INTO ProbeSet(ChipId, Name) VALUES(%(chipid)s, %(pheno)s) "
+            "ON DUPLICATE KEY UPDATE Name=Name",
+            tuple({"pheno": pheno, "chipid": platformid} for pheno in phenos))
+        return cursor.rowcount
+
+def insert_probeset_data(dbconn: mdb.Connection,
+                         speciesid: int,
+                         phenos: tuple[dict, ...]) -> tuple[
+                             int, tuple[dict, ...]]:
+    """Insert numeric (averages) data into the database."""
+    with dbconn.cursor(cursorclass=DictCursor) as cursor:
+        _other, samplenames = reduce(#type: ignore[var-annotated]
+            lambda acc, row: (#type: ignore[arg-type, return-value]
+                (acc[0] + (row,)),
+                (acc[1] + (row["id"],))),
+            phenos,
+            (tuple(), tuple()))
+        paramstr = ", ".join(["%s"] * len(samplenames))
+        cursor.execute(
+            "SELECT Id, Name FROM Strain WHERE SpeciesId=%s "
+            f"AND Name IN ({paramstr})",
+            (speciesid,) + samplenames)
+        sampleids = {row["Name"]: row["Id"] for row in cursor.fetchall()}
+        cursor.execute("SELECT MAX(Id) AS lastid FROM ProbeSetData")
+        lastid = (cursor.fetchone() or {}).get("lastid", 0)
+        data = tuple(
+            {**row, "psdid": lastid + idx}
+            for idx, row in enumerate(
+                    ({
+                        "sampleid": sampleids[innerrow["id"]],
+                        "value": innerrow[pheno],
+                        "pheno": pheno
+                    } for innerrow in phenos for pheno in
+                     (key for key in innerrow.keys() if key != "id")),
+                    start=1))
+        cursor.executemany(
+            "INSERT INTO ProbeSetData(Id, StrainId, value) "
+            "VALUES(%(psdid)s, %(sampleid)s, %(value)s)",
+            data)
+        return cursor.rowcount, tuple({
+            "dataid": row["psdid"],
+            "pheno": row["pheno"]
+        } for row in data)
+
+def cross_reference_probeset_data(dbconn: mdb.Connection,
+                                  platformid: int,
+                                  datasetid: int,
+                                  phenos: tuple[str, ...],
+                                  dataids: tuple[dict, ...]) -> int:
+    """
+    Cross-reference the data with the dataset and related ProbeSet phenotype.
+    """
+    with dbconn.cursor(cursorclass=DictCursor) as cursor:
+        paramstr = ", ".join(["%s"] * len(phenos))
+        cursor.execute("SELECT Id, Name FROM ProbeSet WHERE ChipId=%s "
+                       f"AND Name IN ({paramstr})",
+                       (platformid,) + phenos)
+        phenoids = {row["Name"]: row["Id"] for row in cursor.fetchall()}
+        cursor.executemany(
+            "INSERT INTO ProbeSetXRef(ProbeSetFreezeId, ProbeSetId, DataId) "
+            "VALUES(%(datasetid)s, %(phenoid)s, %(dataid)s)",
+            tuple({
+                **row,
+                "datasetid": datasetid,
+                "phenoid": phenoids.get(row["pheno"])
+            } for row in dataids))
+        return cursor.rowcount
+
+def install_pheno_files(dbconn: mdb.Connection,
+                        speciesid: int,
+                        platformid: int,
+                        datasetid: int,
+                        rqtl2bundle: Path) -> int:
     """Load data in `pheno` files and other related files into the database."""
-    logger.debug("WE ARE HERE!!!")
-    return 5
+    with ZipFile(str(rqtl2bundle), "r") as zfile:
+        try:
+            rqtl2.validate_bundle(zfile)
+            logger.info("R/qtl2 bundle validated successfully.")
+
+            cdata = rqtl2.control_data(zfile)
+            phenodata = rqtl2.file_data(zfile, "pheno", cdata)
+            phenorows = 0
+            datarows = 0
+            while True:
+                batch = tuple(take(phenodata, 5000))
+                if len(batch) == 0:
+                    logger.info("pheno data loading complete!")
+                    logger.info("Inserted a total of %s new ProbeSets and %s "
+                                "new data rows.",
+                                phenorows,
+                                datarows)
+                    break
+
+                phenos = tuple(key for key in batch[0].keys() if key != "id")
+                count = insert_probesets(dbconn, platformid, phenos)
+                phenorows += count
+                logger.info("Inserted %s new ProbeSets", count)
+
+                count, dataids = insert_probeset_data(dbconn, speciesid, batch)
+                datarows += count
+                logger.info("Inserted %s new data rows", count)
+
+                cross_reference_probeset_data(
+                    dbconn, platformid, datasetid, phenos, dataids)
+        except rqtl2.InvalidFormat as _exc:
+            logger.error(traceback.format_exc())
+            logger.debug("There is no pheno file in the R/qtl2 bundle.")
+        except Exception as _exc:# pylint: disable=[broad-except]
+            logger.error("Failing with the exception: %s", traceback.format_exc())
+            return 3
+    return 0
 
 if __name__ == "__main__":
 
     def cli_args():
-        """Process command-line arguments for install_genotypes"""
+        """Process command-line arguments for `install_phenos`"""
         parser = init_cli_parser(
             "install_genotypes",
             "Parse genotypes from R/qtl2 bundle into the database.")
@@ -36,8 +149,6 @@ if __name__ == "__main__":
         parser.add_argument(
             "platformid",
             help="The platform from which the data was generated.")
-        parser.add_argument("studyid",
-                            help="The study to which the data belongs.")
 
         parser = add_common_arguments(parser)
 
@@ -47,9 +158,7 @@ if __name__ == "__main__":
         cli_args,
         lambda dbconn, args: install_pheno_files(dbconn,
                                                  args.speciesid,
-                                                 args.populationid,
                                                  args.platformid,
-                                                 args.studyid,
                                                  args.datasetid,
                                                  args.rqtl2bundle),
         logger,