"""Run Quality Control checks on R/qtl2 bundle.""" import os import sys import json from time import sleep from pathlib import Path from zipfile import ZipFile from argparse import Namespace import multiprocessing as mproc from functools import reduce, partial from logging import Logger, getLogger, StreamHandler from typing import Union, Sequence, Callable, Iterator import MySQLdb as mdb from redis import Redis from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error from uploader import jobs from uploader.db_utils import database_connection from uploader.check_connections import check_db, check_redis from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe from r_qtl import fileerrors as rqfe from scripts.process_rqtl2_bundle import parse_job from scripts.redis_logger import setup_redis_logger from scripts.cli_parser import init_cli_parser, add_global_data_arguments def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]: """Build and return a function to use to split data in the files. Parameters ---------- cdata: A dict holding the control information included with the R/qtl2 bundle. Returns ------- A function that takes a string and return a tuple of strings. """ separator = cdata["sep"] na_strings = cdata["na.strings"] def __splitter__(line: str) -> tuple[Union[str, None], ...]: return tuple( item if item not in na_strings else None for item in (field.strip() for field in line.strip().split(separator))) return __splitter__ def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]: """Build and return a function to use to split data in the files. Parameters ---------- cdata: A dict holding the control information included with the R/qtl2 bundle. Returns ------- A function that takes a string and return a tuple of strings. """ separator = cdata["sep"] na_strings = cdata["na.strings"] def __joiner__(row: tuple[Union[str, None], ...]) -> str: return separator.join( (na_strings[0] if item is None else item) for item in row) return __joiner__ def check_for_missing_files( rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool: """Check that all files listed in the control file do actually exist.""" logger.info("Checking for missing files.") missing = rqc.missing_files(extractpath) # add_to_errors(rconn, fqjobid, "errors-generic", tuple( # rqfe.MissingFile( # mfile[0], mfile[1], ( # f"File '{mfile[1]}' is listed in the control file under " # f"the '{mfile[0]}' key, but it does not actually exist in " # "the bundle.")) # for mfile in missing)) if len(missing) > 0: logger.error(f"Missing files in the bundle!") return True return False def open_file(file_: Path) -> Iterator: """Open file and return one line at a time.""" with open(file_, "r", encoding="utf8") as infile: for line in infile: yield line def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]: """Check that the markers are okay""" errors = tuple() counts = {} for marker in row: counts = {**counts, marker: counts.get(marker, 0) + 1} if marker is None or marker == "": errors = errors + (rqfe.InvalidValue( filename, "markers" "-", marker, "A marker MUST be a valid value."),) return errors + tuple( rqfe.InvalidValue(filename, "markers", key, f"Marker '{key}' was repeated {value} times") for key,value in counts.items() if value > 1) def check_geno_line( filename: str, headers: tuple[str, ...], row: tuple[Union[str, None]], cdata: dict ) -> tuple[rqfe.InvalidValue]: """Check that the geno line is correct.""" errors = tuple() # Verify that line has same number of columns as headers if len(headers) != len(row): errors = errors + (rqfe.InvalidValue( filename, headers[0], row[0], row[0], "Every line MUST have the same number of columns."),) # First column is the individuals/cases/samples if not bool(row[0]): errors = errors + (rqfe.InvalidValue( filename, headers[0], row[0], row[0], "The sample/case MUST be a valid value."),) def __process_value__(val): if val in cdata["na.strings"]: return None if val in cdata["alleles"]: return cdata["genotypes"][val] genocode = cdata.get("genotypes", {}) for coltitle, cellvalue in zip(headers[1:],row[1:]): if ( bool(genocode) and cellvalue is not None and cellvalue not in genocode.keys() ): errors = errors + (rqfe.InvalidValue( filename, row[0], coltitle, cellvalue, f"Value '{cellvalue}' is invalid. Expected one of " f"'{', '.join(genocode.keys())}'.")) return errors def file_errors_and_details( file_: Path, filetype: str, cdata: dict, linesplitterfn: Callable, linejoinerfn: Callable, headercheckers: tuple[Callable, ...], bodycheckers: tuple[Callable, ...] ) -> dict: """Compute errors, and other file metadata.""" errors = tuple() if cdata[f"{filetype}_transposed"]: rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) for lineno, line in enumerate(open_file(file_), start=1): row = linesplitterfn(line) if lineno == 1: headers = tuple(row) errors = errors + reduce( lambda errs, fnct: errs + fnct(file_.name, row[1:]), headercheckers, tuple()) continue errors = errors + reduce( lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata), bodycheckers, tuple()) return { "filename": file_.name, "filesize": os.stat(file_).st_size, "linecount": lineno, # TOD0: Maybe put the errors in a redis list and return the name of the # redis list rather than the errors. Maybe also replace the errors # key with a flag e.g. "errors-found": True/False "errors": errors } def check_for_geno_errors( extractdir: Path, cdata: dict, linesplitterfn: Callable[[str], tuple[Union[str, None]]], linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], logger: Logger) -> bool: """Check for errors in genotype files.""" if "geno" in cdata: genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"]) ## Run code below in multiprocessing once you verify it works. gerrs = tuple(file_errors_and_details( file_, filetype="geno", cdata=cdata, linesplitterfn=linesplitterfn, linejoinerfn=linejoinerfn, headercheckers=(check_markers,), bodycheckers=(check_geno_line,)) for file_ in genofiles) # TOD0: Add the errors to redis if len(gerrs) > 0: logger.error("At least one of the 'geno' files has (an) error(s).") return True logger.info("No error(s) found in any of the 'geno' files.") return False # def check_for_pheno_errors(...): # """Check for errors in phenotype files.""" # pass # def check_for_phenose_errors(...): # """Check for errors in phenotype, standard-error files.""" # pass # def check_for_phenocovar_errors(...): # """Check for errors in phenotype-covariates files.""" # pass def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int: """Run quality control checks on R/qtl2 bundles.""" fqjobid = jobs.job_key(args.redisprefix, args.jobid) thejob = parse_job(rconn, args.redisprefix, args.jobid) print(f"THE JOB =================> {thejob}") jobmeta = thejob["job-metadata"] inpath = Path(jobmeta["rqtl2-bundle-file"]) extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir") with ZipFile(inpath, "r") as zfile: rqtl2.extract(zfile, extractdir) ### BEGIN: The quality control checks ### cdata = rqtl2.control_data(extractdir) splitter = build_line_splitter(cdata) joiner = build_line_joiner(cdata) check_for_missing_files(rconn, fqjobid, extractdir, logger) check_for_geno_errors(extractdir, cdata, splitter, joiner, logger) # check_for_pheno_errors(...) # check_for_phenose_errors(...) # check_for_phenocovar_errors(...) ### END: The quality control checks ### def __fetch_errors__(rkey: str) -> tuple: return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) return (1 if any(( bool(__fetch_errors__(key)) for key in ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) else 0) if __name__ == "__main__": def main(): """Enter R/qtl2 bundle QC runner.""" args = add_global_data_arguments(init_cli_parser( "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() check_redis(args.redisuri) check_db(args.databaseuri) logger = getLogger("qc-on-rqtl2-bundle") logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") fqjobid = jobs.job_key(args.redisprefix, args.jobid) with Redis.from_url(args.redisuri, decode_responses=True) as rconn: logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) exitcode = run_qc(rconn, args, logger) rconn.hset( jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) return exitcode sys.exit(main())