diff options
| -rw-r--r-- | scripts/run_qtlreaper.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/scripts/run_qtlreaper.py b/scripts/run_qtlreaper.py new file mode 100644 index 0000000..764ee70 --- /dev/null +++ b/scripts/run_qtlreaper.py @@ -0,0 +1,212 @@ +"""Script to run rust-qtlreaper and update database with results.""" +import sys +import csv +import time +import secrets +import logging +import traceback +import subprocess +from pathlib import Path +from typing import Union +from functools import reduce +from argparse import Namespace, ArgumentParser + +from gn_libs import mysqldb + +from uploader.phenotypes.models import phenotypes_vector_data +from uploader.population.models import population_by_species_and_id +from uploader.samples.models import samples_by_species_and_population + +from scripts.cli.logging import setup_logging +from scripts.cli.validators import directory_exists +from scripts.cli.options import add_logging, add_mariadb_uri, add_population_id + +logger = logging.getLogger(__name__) + + +def retrieve_genotype_file(genotypes_dir: Path, population_code: str) -> Path: + """Retrieves the genotype file""" + _genofile = genotypes_dir.joinpath(f"{population_code}.geno") + if _genofile.exists(): + return _genofile + raise FileNotFoundError(f"Could not find the genotype file '{population_code}.geno'") + + +def samples_from_genofile(genofile: Path) -> tuple[str, ...]: + """Read samples from the genotype file.""" + with genofile.open(mode="r", encoding="utf-8") as inptr: + while True: + line = inptr.readline() + if (line.startswith("#") # comment line + or line.startswith("@") # allele? heterozygosity? + or line.strip() == "" # empty line + ): + continue + return tuple(line.strip().split("\t")[4:]) + + +def reconcile_samples( + genosamples: tuple[str, ...], + dbsamples: tuple[str, ...] +) -> tuple[tuple[str, ...], tuple[str, ...]]: + """merge samples in genosamples and dbsamples and retain order in genosamples.""" + in_db_not_geno = set(dbsamples).difference(genosamples) + return genosamples, tuple(in_db_not_geno) + + +def generate_qtlreaper_traits_file( + outdir: Path, + samples: tuple[str, ...], + traits_data: dict[str, Union[int, float]], + filename_prefix: str = "" +) -> Path: + """Generate a file for use with qtlreaper that contains the traits' data.""" + _dialect = csv.unix_dialect() + _dialect.delimiter="\t" + _dialect.quoting=0 + + _traitsfile = outdir.joinpath( + f"{filename_prefix}_{secrets.token_urlsafe(15)}.tsv") + with _traitsfile.open(mode="w", encoding="utf-8") as outptr: + writer = csv.DictWriter( + outptr, fieldnames=("Trait",) + samples, dialect=_dialect) + writer.writeheader() + for row in traits_data: + writer.writerow({ + "Trait": row["xref_id"], + **{sample: row.get(sample, "") for sample in samples} + }) + + return _traitsfile + + +def parse_tsv_file(results_file: Path) -> list[dict]: + """Parse the rust-qtlreaper output into usable python objects.""" + with results_file.open("r", encoding="utf-8") as readptr: + _dialect = csv.unix_dialect() + _dialect.delimiter = "\t" + reader = csv.DictReader(readptr, dialect=_dialect) + for row in reader: + yield row + + +def __qtls_by_trait__(qtls, current): + """Organise QTL results by trait""" + return { + **qtls, + current["ID"]: qtls.get(current["ID"], tuple()) + (current,) + } + + +def save_qtl_values_to_db(conn, qtls: dict): + with conn.cursor() as cursor: + cursor.executemany( + "UPDATE PublishXRef SET " + "Locus=%(Locus)s, LRS=%(LRS)s, additive=%(Additive)s " + "WHERE Id=%(ID)s", + qtls) + + +def dispatch(args: Namespace) -> int: + """Dispatch the actual logic.""" + with mysqldb.database_connection(args.db_uri) as conn: + try: + population = population_by_species_and_id(conn, args.species_id, args.population_id) + assert population, (f"No population with ID '{args.population_id} for " + f"species with ID '{args.species_id}'.") + _genofile = retrieve_genotype_file(args.genotypes_dir, population["Name"]) + logger.debug("Genotype file: %s", _genofile) + samples, _samples_not_in_genofile = reconcile_samples( + samples_from_genofile(_genofile), + tuple( + sample["Name"] for sample in + samples_by_species_and_population( + conn, args.species_id, args.population_id))) + if len(_samples_not_in_genofile) > 0: + logger.warning( + "Ignoring %d samples that are in the database but not in " + "the provided genotype file.", + len(_samples_not_in_genofile)) + logger.debug("Ignored the following samples: %s", + ", ".join(_samples_not_in_genofile)) + + # Fetch traits data: provided list, or all traits in db + _traitsdata = phenotypes_vector_data( + conn, + args.species_id, + args.population_id, + xref_ids=tuple(args.xref_ids)).values() + logger.debug("Successfully got traits data. Generating the QTLReaper's traits file…") + _traitsfile = generate_qtlreaper_traits_file( + args.working_dir, + samples, + _traitsdata, + filename_prefix="qtlreaper_input_traits_file") + logger.debug("QTLReaper's Traits file: %s", _traitsfile) + + _qtlreaper_main_output = args.working_dir.joinpath( + f"main-output-{secrets.token_urlsafe(15)}.tsv") + logger.debug("Main output filename: %s", _qtlreaper_main_output) + with subprocess.Popen( + ("qtlreaper", + "--n_permutations", "1000", + "--geno", _genofile, + "--traits", _traitsfile, + "--main_output", _qtlreaper_main_output)) as _qtlreaper: + while _qtlreaper.poll() is None: + logger.debug("QTLReaper process running…") + time.sleep(1) + results = tuple(max(qtls, key=lambda qtl: qtl["LRS"]) + for qtls in + reduce(__qtls_by_trait__, + parse_tsv_file(_qtlreaper_main_output), + {}).values()) + save_qtl_values_to_db(conn, results) + logger.debug("Cleaning up temporary files.") + _traitsfile.unlink() + _qtlreaper_main_output.unlink() + logger.info("Successfully computed p values for %s traits.", len(_traitsdata)) + return 0 + except FileNotFoundError as fnf: + logger.error(", ".join(fnf.args), exc_info=False) + except AssertionError as aserr: + logger.error(", ".join(aserr.args), exc_info=False) + except Exception as _exc: + logger.debug("Type of exception: %s", type(_exc)) + logger.error("General exception!", exc_info=True) + finally: + return 1 + +# python3 -m scripts.run_qtlreaper "mysql://webqtlout:webqtlout@127.0.0.1:3307/db_webqtl" 1 1 /home/frederick/genotype_files/genotype /tmp/qc_app_files/qtlreaper --loglevel debug + +if __name__ == "__main__": + def main(): + """run_qtlreaper.py: entry point.""" + parser = add_logging(add_population_id(add_mariadb_uri( + ArgumentParser("run_qtlreaper")))) + parser.add_argument( + "genotypes_dir", + metavar="GENOTYPES-DIRECTORY", + type=directory_exists, + help="Path to directory with the genotypes.") + parser.add_argument( + "working_dir", + metavar="WORKING-DIRECTORY", + type=directory_exists, + help="Directory where the script will write temporary files.") + parser.add_argument( + "xref_ids", + metavar="CROSS-REFERENCE-IDS", + type=int, + nargs="*", + help=("Optional list of specific cross-reference IDs to narrow down" + " to. If provided, QTLReaper will only run against them. " + "If NOT provided, QTLReaper will run against all the traits " + "in the population.")) + args = parser.parse_args() + setup_logging(logger, args.log_level) + logger.debug("CLI arguments: %s", args) + + return dispatch(args) + + sys.exit(main()) |
