"""Script to run rust-qtlreaper and update database with results.""" import sys import csv import time import secrets import logging import traceback import subprocess from pathlib import Path from typing import Union from argparse import Namespace, ArgumentParser from gn_libs import mysqldb from uploader.phenotypes.models import phenotypes_vector_data from uploader.population.models import population_by_species_and_id from uploader.samples.models import samples_by_species_and_population from scripts.cli.logging import setup_logging from scripts.cli.validators import directory_exists from scripts.cli.options import add_logging, add_mariadb_uri, add_population_id logger = logging.getLogger(__name__) def retrieve_genotype_file(genotypes_dir: Path, population_code: str) -> Path: """Retrieves the genotype file""" _genofile = genotypes_dir.joinpath(f"{population_code}.geno") if _genofile.exists(): return _genofile raise FileNotFoundError(f"Could not find the genotype file '{population_code}.geno'") def samples_from_genofile(genofile: Path) -> tuple[str, ...]: """Read samples from the genotype file.""" with genofile.open(mode="r", encoding="utf-8") as inptr: while True: line = inptr.readline() if (line.startswith("#") # comment line or line.startswith("@") # allele? heterozygosity? or line.strip() == "" # empty line ): continue return tuple(line.strip().split("\t")[4:]) def reconcile_samples( genosamples: tuple[str, ...], dbsamples: tuple[str, ...] ) -> tuple[str, ...]: """merge samples in genosamples and dbsamples and retain order in genosamples.""" return genosamples + tuple(set(dbsamples).difference(genosamples)) def generate_qtlreaper_traits_file( outdir: Path, samples: tuple[str, ...], traits_data: dict[str, Union[int, float]], filename_prefix: str = "" ) -> Path: """Generate a file for use with qtlreaper that contains the traits' data.""" _dialect = csv.unix_dialect() _dialect.delimiter="\t" _dialect.quoting=0 _traitsfile = outdir.joinpath( f"{filename_prefix}_{secrets.token_urlsafe(15)}.tsv") with _traitsfile.open(mode="w", encoding="utf-8") as outptr: writer = csv.DictWriter( outptr, fieldnames=("Trait",) + samples, dialect=_dialect) writer.writeheader() for row in traits_data: writer.writerow({ "Trait": row["xref_id"], **{sample: row.get(sample, "") for sample in samples} }) return _traitsfile def dispatch(args: Namespace) -> int: """Dispatch the actual logic.""" with mysqldb.database_connection(args.db_uri) as conn: try: population = population_by_species_and_id(conn, args.species_id, args.population_id) assert population, (f"No population with ID '{args.population_id} for " f"species with ID '{args.species_id}'.") _genofile = retrieve_genotype_file(args.genotypes_dir, population["Name"]) logger.debug("Genotype file: %s", _genofile) samples = reconcile_samples( samples_from_genofile(_genofile), tuple( sample["Name"] for sample in samples_by_species_and_population( conn, args.species_id, args.population_id))) logger.debug("Total samples: %s", len(samples)) # Fetch traits data: provided list, or all traits in db _traitsdata = phenotypes_vector_data( conn, args.species_id, args.population_id, xref_ids=tuple(args.xref_ids)).values() # traits_file = generate_traits_file(conn, population, args.xref_ids or traits_list(...)) logger.debug("Successfully got traits data. Generating the QTLReaper's traits file…") _traitsfile = generate_qtlreaper_traits_file( args.working_dir, samples, _traitsdata, # call function above here. filename_prefix="qtlreaper_input_traits_file") logger.debug("QTLReaper's Traits file: %s", _traitsfile) # qtlreaper --json --n_permutations 1000 --geno genofile --traits traitsfile --main_output ... _qtlreaper_main_output = args.working_dir.joinpath( f"main-output-{secrets.token_urlsafe(15)}.json") logger.debug("Main output filename: %s", _qtlreaper_main_output) with subprocess.Popen( ("qtlreaper", "--json", "--n_permutations", "1000", "--geno", _genofile, "--traits", _traitsfile, "--main_output", _qtlreaper_main_output)) as _qtlreaper: while _qtlreaper.poll() is None: logger.debug("QTLReaper process running…") time.sleep(1) # results = parse_qtlreaper_results(...) # save_p_values_to_db(conn, results) # logger.info("Successfully computed p values for %s traits.", len(traits_list)) return 0 except FileNotFoundError as fnf: logger.error(", ".join(fnf.args), exc_info=False) except AssertionError as aserr: logger.error(", ".join(aserr.args), exc_info=False) except Exception as _exc: logger.debug("Type of exception: %s", type(_exc)) logger.error("General exception!", exc_info=True) finally: return 1 # python3 -m scripts.run_qtlreaper "mysql://webqtlout:webqtlout@127.0.0.1:3307/db_webqtl" 1 1 /home/frederick/genotype_files/genotype /tmp/qc_app_files/qtlreaper --loglevel debug if __name__ == "__main__": def main(): """run_qtlreaper.py: entry point.""" parser = add_logging(add_population_id(add_mariadb_uri( ArgumentParser("run_qtlreaper")))) parser.add_argument( "genotypes_dir", metavar="GENOTYPES-DIRECTORY", type=directory_exists, help="Path to directory with the genotypes.") parser.add_argument( "working_dir", metavar="WORKING-DIRECTORY", type=directory_exists, help="Directory where the script will write temporary files.") parser.add_argument( "xref_ids", metavar="CROSS-REFERENCE-IDS", type=int, nargs="*", help=("Optional list of specific cross-reference IDs to narrow down" " to. If provided, QTLReaper will only run against them. " "If NOT provided, QTLReaper will run against all the traits " "in the population.")) args = parser.parse_args() setup_logging(logger, args.log_level) logger.debug("CLI arguments: %s", args) return dispatch(args) sys.exit(main())