1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
"""Script to run rust-qtlreaper and update database with results."""
import sys
import csv
import time
import secrets
import logging
import traceback
import subprocess
from pathlib import Path
from typing import Union
from argparse import Namespace, ArgumentParser
from gn_libs import mysqldb
from uploader.phenotypes.models import phenotypes_vector_data
from uploader.population.models import population_by_species_and_id
from uploader.samples.models import samples_by_species_and_population
from scripts.cli.logging import setup_logging
from scripts.cli.validators import directory_exists
from scripts.cli.options import add_logging, add_mariadb_uri, add_population_id
logger = logging.getLogger(__name__)
def retrieve_genotype_file(genotypes_dir: Path, population_code: str) -> Path:
"""Retrieves the genotype file"""
_genofile = genotypes_dir.joinpath(f"{population_code}.geno")
if _genofile.exists():
return _genofile
raise FileNotFoundError(f"Could not find the genotype file '{population_code}.geno'")
def samples_from_genofile(genofile: Path) -> tuple[str, ...]:
"""Read samples from the genotype file."""
with genofile.open(mode="r", encoding="utf-8") as inptr:
while True:
line = inptr.readline()
if (line.startswith("#") # comment line
or line.startswith("@") # allele? heterozygosity?
or line.strip() == "" # empty line
):
continue
return tuple(line.strip().split("\t")[4:])
def reconcile_samples(
genosamples: tuple[str, ...],
dbsamples: tuple[str, ...]
) -> tuple[str, ...]:
"""merge samples in genosamples and dbsamples and retain order in genosamples."""
return genosamples + tuple(set(dbsamples).difference(genosamples))
def generate_qtlreaper_traits_file(
outdir: Path,
samples: tuple[str, ...],
traits_data: dict[str, Union[int, float]],
filename_prefix: str = ""
) -> Path:
"""Generate a file for use with qtlreaper that contains the traits' data."""
_dialect = csv.unix_dialect()
_dialect.delimiter="\t"
_dialect.quoting=0
_traitsfile = outdir.joinpath(
f"{filename_prefix}_{secrets.token_urlsafe(15)}.tsv")
with _traitsfile.open(mode="w", encoding="utf-8") as outptr:
writer = csv.DictWriter(
outptr, fieldnames=("Trait",) + samples, dialect=_dialect)
writer.writeheader()
for row in traits_data:
writer.writerow({
"Trait": row["xref_id"],
**{sample: row.get(sample, "") for sample in samples}
})
return _traitsfile
def dispatch(args: Namespace) -> int:
"""Dispatch the actual logic."""
with mysqldb.database_connection(args.db_uri) as conn:
try:
population = population_by_species_and_id(conn, args.species_id, args.population_id)
assert population, (f"No population with ID '{args.population_id} for "
f"species with ID '{args.species_id}'.")
_genofile = retrieve_genotype_file(args.genotypes_dir, population["Name"])
logger.debug("Genotype file: %s", _genofile)
samples = reconcile_samples(
samples_from_genofile(_genofile),
tuple(
sample["Name"] for sample in
samples_by_species_and_population(
conn, args.species_id, args.population_id)))
logger.debug("Total samples: %s", len(samples))
# Fetch traits data: provided list, or all traits in db
_traitsdata = phenotypes_vector_data(
conn,
args.species_id,
args.population_id,
xref_ids=tuple(args.xref_ids)).values()
# traits_file = generate_traits_file(conn, population, args.xref_ids or traits_list(...))
logger.debug("Successfully got traits data. Generating the QTLReaper's traits file…")
_traitsfile = generate_qtlreaper_traits_file(
args.working_dir,
samples,
_traitsdata, # call function above here.
filename_prefix="qtlreaper_input_traits_file")
logger.debug("QTLReaper's Traits file: %s", _traitsfile)
# qtlreaper --json --n_permutations 1000 --geno genofile --traits traitsfile --main_output ...
_qtlreaper_main_output = args.working_dir.joinpath(
f"main-output-{secrets.token_urlsafe(15)}.json")
logger.debug("Main output filename: %s", _qtlreaper_main_output)
with subprocess.Popen(
("qtlreaper",
"--json",
"--n_permutations", "1000",
"--geno", _genofile,
"--traits", _traitsfile,
"--main_output", _qtlreaper_main_output)) as _qtlreaper:
while _qtlreaper.poll() is None:
logger.debug("QTLReaper process running…")
time.sleep(1)
# results = parse_qtlreaper_results(...)
# save_p_values_to_db(conn, results)
# logger.info("Successfully computed p values for %s traits.", len(traits_list))
return 0
except FileNotFoundError as fnf:
logger.error(", ".join(fnf.args), exc_info=False)
except AssertionError as aserr:
logger.error(", ".join(aserr.args), exc_info=False)
except Exception as _exc:
logger.debug("Type of exception: %s", type(_exc))
logger.error("General exception!", exc_info=True)
finally:
return 1
# python3 -m scripts.run_qtlreaper "mysql://webqtlout:webqtlout@127.0.0.1:3307/db_webqtl" 1 1 /home/frederick/genotype_files/genotype /tmp/qc_app_files/qtlreaper --loglevel debug
if __name__ == "__main__":
def main():
"""run_qtlreaper.py: entry point."""
parser = add_logging(add_population_id(add_mariadb_uri(
ArgumentParser("run_qtlreaper"))))
parser.add_argument(
"genotypes_dir",
metavar="GENOTYPES-DIRECTORY",
type=directory_exists,
help="Path to directory with the genotypes.")
parser.add_argument(
"working_dir",
metavar="WORKING-DIRECTORY",
type=directory_exists,
help="Directory where the script will write temporary files.")
parser.add_argument(
"xref_ids",
metavar="CROSS-REFERENCE-IDS",
type=int,
nargs="*",
help=("Optional list of specific cross-reference IDs to narrow down"
" to. If provided, QTLReaper will only run against them. "
"If NOT provided, QTLReaper will run against all the traits "
"in the population."))
args = parser.parse_args()
setup_logging(logger, args.log_level)
logger.debug("CLI arguments: %s", args)
return dispatch(args)
sys.exit(main())
|