1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
"""Load pheno from R/qtl2 bundle into the database."""
import sys
import argparse
import traceback
from zipfile import ZipFile
from functools import reduce
from logging import Logger, getLogger
from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
from scripts.rqtl2.entry import build_main
from scripts.rqtl2.cli_parser import add_common_arguments
from scripts.cli_parser import init_cli_parser, add_global_data_arguments
from r_qtl import r_qtl2 as rqtl2
from r_qtl import r_qtl2_qc as rqc
from functional_tools import take
__MODULE__ = "scripts.rqtl2.install_phenos"
def insert_probesets(dbconn: mdb.Connection,
platformid: int,
phenos: tuple[str, ...]) -> int:
"""Insert any new phenos into the database."""
with dbconn.cursor() as cursor:
cursor.executemany(
"INSERT INTO ProbeSet(ChipId, Name) VALUES(%(chipid)s, %(pheno)s) "
"ON DUPLICATE KEY UPDATE Name=Name",
tuple({"pheno": pheno, "chipid": platformid} for pheno in phenos))
return cursor.rowcount
def insert_probeset_data(dbconn: mdb.Connection,
speciesid: int,
phenos: tuple[dict, ...]) -> tuple[
int, tuple[dict, ...]]:
"""Insert numeric (averages) data into the database."""
with dbconn.cursor(cursorclass=DictCursor) as cursor:
_other, samplenames = reduce(#type: ignore[var-annotated]
lambda acc, row: (#type: ignore[arg-type, return-value]
(acc[0] + (row,)),
(acc[1] + (row["id"],))),
phenos,
(tuple(), tuple()))
paramstr = ", ".join(["%s"] * len(samplenames))
cursor.execute(
"SELECT Id, Name FROM Strain WHERE SpeciesId=%s "
f"AND Name IN ({paramstr})",
(speciesid,) + samplenames)
sampleids = {row["Name"]: row["Id"] for row in cursor.fetchall()}
cursor.execute("SELECT MAX(Id) AS lastid FROM ProbeSetData")
lastid = (cursor.fetchone() or {}).get("lastid", 0)
data = tuple(
{**row, "psdid": lastid + idx}
for idx, row in enumerate(
(item for item in ({
"sampleid": sampleids[innerrow["id"]],
"value": innerrow[pheno],
"pheno": pheno
} for innerrow in phenos for pheno in
(key for key in innerrow.keys() if key != "id"))
if item["value"] is not None),
start=1))
cursor.executemany(
"INSERT INTO ProbeSetData(Id, StrainId, value) "
"VALUES(%(psdid)s, %(sampleid)s, %(value)s)",
data)
return cursor.rowcount, tuple({
"dataid": row["psdid"],
"pheno": row["pheno"]
} for row in data)
def cross_reference_probeset_data(dbconn: mdb.Connection,
platformid: int,
datasetid: int,
phenos: tuple[str, ...],
dataids: tuple[dict, ...]) -> int:
"""
Cross-reference the data with the dataset and related ProbeSet phenotype.
"""
with dbconn.cursor(cursorclass=DictCursor) as cursor:
paramstr = ", ".join(["%s"] * len(phenos))
cursor.execute("SELECT Id, Name FROM ProbeSet WHERE ChipId=%s "
f"AND Name IN ({paramstr})",
(platformid,) + phenos)
phenoids = {row["Name"]: row["Id"] for row in cursor.fetchall()}
cursor.executemany(
"INSERT INTO ProbeSetXRef(ProbeSetFreezeId, ProbeSetId, DataId) "
"VALUES(%(datasetid)s, %(phenoid)s, %(dataid)s)",
tuple({
**row,
"datasetid": datasetid,
"phenoid": phenoids.get(row["pheno"])
} for row in dataids))
return cursor.rowcount
def install_pheno_files(#pylint: disable=[too-many-locals]
rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
args: argparse.Namespace,
logger: Logger = getLogger()) -> int:
"""Load data in `pheno` files and other related files into the database."""
(speciesid, platformid, datasetid, rqtl2bundle) = (
args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle)
with ZipFile(str(rqtl2bundle), "r") as zfile:
try:
rqc.validate_bundle(zfile)
logger.info("R/qtl2 bundle validated successfully.")
cdata = rqtl2.control_data(zfile)
phenodata = rqtl2.file_data(zfile, "pheno", cdata)
phenorows = 0
datarows = 0
while True:
batch = tuple(take(phenodata, 1000))
if len(batch) == 0:
logger.info("pheno data loading complete!")
logger.info("Inserted a total of %s new ProbeSets and %s "
"new data rows.",
phenorows,
datarows)
break
phenos = tuple(key for key in batch[0].keys() if key != "id")
count = insert_probesets(dbconn, platformid, phenos)
phenorows += count
logger.info("Inserted %s new ProbeSets", count)
count, dataids = insert_probeset_data(dbconn, speciesid, batch)
datarows += count
logger.info("Inserted %s new data rows", count)
cross_reference_probeset_data(
dbconn, platformid, datasetid, phenos, dataids)
except rqtl2.InvalidFormat as _exc:
logger.error(traceback.format_exc())
logger.debug("There is no pheno file in the R/qtl2 bundle.")
except Exception as _exc:# pylint: disable=[broad-except]
logger.error("Failing with the exception: %s", traceback.format_exc())
return 3
return 0
if __name__ == "__main__":
def cli_args():
"""Process command-line arguments for `install_phenos`"""
parser = add_global_data_arguments(init_cli_parser(
"install_genotypes",
"Parse genotypes from R/qtl2 bundle into the database."))
parser.add_argument(
"platformid",
help="The platform from which the data was generated.")
parser = add_common_arguments(parser)
return parser.parse_args()
main = build_main(cli_args(), install_pheno_files, __MODULE__)
sys.exit(main())
|