From b120db84b7a25da15cd70e73f8af4c84b6517427 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 12 Aug 2024 16:26:28 -0500 Subject: Rewrite the QC code for R/qtl2 --- scripts/qc_on_rqtl2_bundle2.py | 307 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 scripts/qc_on_rqtl2_bundle2.py diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py new file mode 100644 index 0000000..f39689e --- /dev/null +++ b/scripts/qc_on_rqtl2_bundle2.py @@ -0,0 +1,307 @@ +"""Run Quality Control checks on R/qtl2 bundle.""" +import os +import sys +import json +from time import sleep +from pathlib import Path +from zipfile import ZipFile +from argparse import Namespace +import multiprocessing as mproc +from functools import reduce, partial +from logging import Logger, getLogger, StreamHandler +from typing import Union, Sequence, Callable, Iterator + +import MySQLdb as mdb +from redis import Redis + +from quality_control.errors import InvalidValue +from quality_control.checks import decimal_points_error + +from uploader import jobs +from uploader.db_utils import database_connection +from uploader.check_connections import check_db, check_redis + +from r_qtl import r_qtl2 as rqtl2 +from r_qtl import r_qtl2_qc as rqc +from r_qtl import exceptions as rqe +from r_qtl import fileerrors as rqfe + +from scripts.process_rqtl2_bundle import parse_job +from scripts.redis_logger import setup_redis_logger +from scripts.cli_parser import init_cli_parser, add_global_data_arguments + + +def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]: + """Build and return a function to use to split data in the files. + + Parameters + ---------- + cdata: A dict holding the control information included with the R/qtl2 + bundle. + + Returns + ------- + A function that takes a string and return a tuple of strings. + """ + separator = cdata["sep"] + na_strings = cdata["na.strings"] + def __splitter__(line: str) -> tuple[Union[str, None], ...]: + return tuple( + item if item not in na_strings else None + for item in + (field.strip() for field in line.strip().split(separator))) + return __splitter__ + + +def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]: + """Build and return a function to use to split data in the files. + + Parameters + ---------- + cdata: A dict holding the control information included with the R/qtl2 + bundle. + + Returns + ------- + A function that takes a string and return a tuple of strings. + """ + separator = cdata["sep"] + na_strings = cdata["na.strings"] + def __joiner__(row: tuple[Union[str, None], ...]) -> str: + return separator.join( + (na_strings[0] if item is None else item) + for item in row) + return __joiner__ + + +def check_for_missing_files( + rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool: + """Check that all files listed in the control file do actually exist.""" + logger.info("Checking for missing files.") + missing = rqc.missing_files(extractpath) + # add_to_errors(rconn, fqjobid, "errors-generic", tuple( + # rqfe.MissingFile( + # mfile[0], mfile[1], ( + # f"File '{mfile[1]}' is listed in the control file under " + # f"the '{mfile[0]}' key, but it does not actually exist in " + # "the bundle.")) + # for mfile in missing)) + if len(missing) > 0: + logger.error(f"Missing files in the bundle!") + return True + return False + + +def open_file(file_: Path) -> Iterator: + """Open file and return one line at a time.""" + with open(file_, "r", encoding="utf8") as infile: + for line in infile: + yield line + + +def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]: + """Check that the markers are okay""" + errors = tuple() + counts = {} + for marker in row: + counts = {**counts, marker: counts.get(marker, 0) + 1} + if marker is None or marker == "": + errors = errors + (rqfe.InvalidValue( + filename, + "markers" + "-", + marker, + "A marker MUST be a valid value."),) + + return errors + tuple( + rqfe.InvalidValue(filename, + "markers", + key, + f"Marker '{key}' was repeated {value} times") + for key,value in counts.items() if value > 1) + + +def check_geno_line( + filename: str, + headers: tuple[str, ...], + row: tuple[Union[str, None]], + cdata: dict +) -> tuple[rqfe.InvalidValue]: + """Check that the geno line is correct.""" + errors = tuple() + # Verify that line has same number of columns as headers + if len(headers) != len(row): + errors = errors + (rqfe.InvalidValue( + filename, + headers[0], + row[0], + row[0], + "Every line MUST have the same number of columns."),) + + # First column is the individuals/cases/samples + if not bool(row[0]): + errors = errors + (rqfe.InvalidValue( + filename, + headers[0], + row[0], + row[0], + "The sample/case MUST be a valid value."),) + + def __process_value__(val): + if val in cdata["na.strings"]: + return None + if val in cdata["alleles"]: + return cdata["genotypes"][val] + + genocode = cdata.get("genotypes", {}) + for coltitle, cellvalue in zip(headers[1:],row[1:]): + if ( + bool(genocode) and + cellvalue is not None and + cellvalue not in genocode.keys() + ): + errors = errors + (rqfe.InvalidValue( + filename, row[0], coltitle, cellvalue, + f"Value '{cellvalue}' is invalid. Expected one of " + f"'{', '.join(genocode.keys())}'.")) + + return errors + + +def file_errors_and_details( + file_: Path, + filetype: str, + cdata: dict, + linesplitterfn: Callable, + linejoinerfn: Callable, + headercheckers: tuple[Callable, ...], + bodycheckers: tuple[Callable, ...] +) -> dict: + """Compute errors, and other file metadata.""" + errors = tuple() + if cdata[f"{filetype}_transposed"]: + rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) + + for lineno, line in enumerate(open_file(file_), start=1): + row = linesplitterfn(line) + if lineno == 1: + headers = tuple(row) + errors = errors + reduce( + lambda errs, fnct: errs + fnct(file_.name, row[1:]), + headercheckers, + tuple()) + continue + + errors = errors + reduce( + lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata), + bodycheckers, + tuple()) + + return { + "filename": file_.name, + "filesize": os.stat(file_).st_size, + "linecount": lineno, + # TOD0: Maybe put the errors in a redis list and return the name of the + # redis list rather than the errors. Maybe also replace the errors + # key with a flag e.g. "errors-found": True/False + "errors": errors + } + + +def check_for_geno_errors( + extractdir: Path, + cdata: dict, + linesplitterfn: Callable[[str], tuple[Union[str, None]]], + linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], + logger: Logger) -> bool: + """Check for errors in genotype files.""" + if "geno" in cdata: + genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"]) + ## Run code below in multiprocessing once you verify it works. + gerrs = tuple(file_errors_and_details( + file_, + filetype="geno", + cdata=cdata, + linesplitterfn=linesplitterfn, + linejoinerfn=linejoinerfn, + headercheckers=(check_markers,), + bodycheckers=(check_geno_line,)) for file_ in genofiles) + # TOD0: Add the errors to redis + if len(gerrs) > 0: + logger.error("At least one of the 'geno' files has (an) error(s).") + return True + logger.info("No error(s) found in any of the 'geno' files.") + return False + + +# def check_for_pheno_errors(...): +# """Check for errors in phenotype files.""" +# pass + + +# def check_for_phenose_errors(...): +# """Check for errors in phenotype, standard-error files.""" +# pass + + +# def check_for_phenocovar_errors(...): +# """Check for errors in phenotype-covariates files.""" +# pass + + +def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int: + """Run quality control checks on R/qtl2 bundles.""" + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + thejob = parse_job(rconn, args.redisprefix, args.jobid) + print(f"THE JOB =================> {thejob}") + jobmeta = thejob["job-metadata"] + inpath = Path(jobmeta["rqtl2-bundle-file"]) + extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir") + with ZipFile(inpath, "r") as zfile: + rqtl2.extract(zfile, extractdir) + + ### BEGIN: The quality control checks ### + cdata = rqtl2.control_data(extractdir) + splitter = build_line_splitter(cdata) + joiner = build_line_joiner(cdata) + check_for_missing_files(rconn, fqjobid, extractdir, logger) + check_for_geno_errors(extractdir, cdata, splitter, joiner, logger) + # check_for_pheno_errors(...) + # check_for_phenose_errors(...) + # check_for_phenocovar_errors(...) + ### END: The quality control checks ### + + def __fetch_errors__(rkey: str) -> tuple: + return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) + + return (1 if any(( + bool(__fetch_errors__(key)) + for key in + ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) + else 0) + + +if __name__ == "__main__": + def main(): + """Enter R/qtl2 bundle QC runner.""" + args = add_global_data_arguments(init_cli_parser( + "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() + check_redis(args.redisuri) + check_db(args.databaseuri) + + logger = getLogger("qc-on-rqtl2-bundle") + logger.addHandler(StreamHandler(stream=sys.stderr)) + logger.setLevel("DEBUG") + + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + with Redis.from_url(args.redisuri, decode_responses=True) as rconn: + logger.addHandler(setup_redis_logger( + rconn, fqjobid, f"{fqjobid}:log-messages", + args.redisexpiry)) + + exitcode = run_qc(rconn, args, logger) + rconn.hset( + jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) + return exitcode + + sys.exit(main()) -- cgit v1.2.3