"""Run Quality Control checks on R/qtl2 bundle.""" import os import sys import json from time import sleep from pathlib import Path from zipfile import ZipFile from argparse import Namespace from datetime import timedelta import multiprocessing as mproc from functools import reduce, partial from logging import Logger, getLogger, StreamHandler from typing import Union, Sequence, Callable, Iterator import MySQLdb as mdb from redis import Redis from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error from uploader import jobs from uploader.db_utils import database_connection from uploader.check_connections import check_db, check_redis from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe from r_qtl import fileerrors as rqfe from scripts.process_rqtl2_bundle import parse_job from scripts.redis_logger import setup_redis_logger from scripts.cli_parser import init_cli_parser, add_global_data_arguments from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter def check_for_missing_files( rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool: """Check that all files listed in the control file do actually exist.""" logger.info("Checking for missing files.") missing = rqc.missing_files(extractpath) # add_to_errors(rconn, fqjobid, "errors-generic", tuple( # rqfe.MissingFile( # mfile[0], mfile[1], ( # f"File '{mfile[1]}' is listed in the control file under " # f"the '{mfile[0]}' key, but it does not actually exist in " # "the bundle.")) # for mfile in missing)) if len(missing) > 0: logger.error(f"Missing files in the bundle!") return True return False def open_file(file_: Path) -> Iterator: """Open file and return one line at a time.""" with open(file_, "r", encoding="utf8") as infile: for line in infile: yield line def check_markers( filename: str, row: tuple[str, ...], save_error: lambda val: val ) -> tuple[rqfe.InvalidValue]: """Check that the markers are okay""" errors = tuple() counts = {} for marker in row: counts = {**counts, marker: counts.get(marker, 0) + 1} if marker is None or marker == "": errors = errors + (save_error(rqfe.InvalidValue( filename, "markers" "-", marker, "A marker MUST be a valid value.")),) return errors + tuple( save_error(rqfe.InvalidValue( filename, "markers", key, f"Marker '{key}' was repeated {value} times")) for key,value in counts.items() if value > 1) def check_geno_line( filename: str, headers: tuple[str, ...], row: tuple[Union[str, None]], cdata: dict, save_error: lambda val: val ) -> tuple[rqfe.InvalidValue]: """Check that the geno line is correct.""" errors = tuple() # Verify that line has same number of columns as headers if len(headers) != len(row): errors = errors + (save_error(rqfe.InvalidValue( filename, headers[0], row[0], row[0], "Every line MUST have the same number of columns.")),) # First column is the individuals/cases/samples if not bool(row[0]): errors = errors + (save_error(rqfe.InvalidValue( filename, headers[0], row[0], row[0], "The sample/case MUST be a valid value.")),) def __process_value__(val): if val in cdata["na.strings"]: return None if val in cdata["alleles"]: return cdata["genotypes"][val] genocode = cdata.get("genotypes", {}) for coltitle, cellvalue in zip(headers[1:],row[1:]): if ( bool(genocode) and cellvalue is not None and cellvalue not in genocode.keys() ): errors = errors + (save_error(rqfe.InvalidValue( filename, row[0], coltitle, cellvalue, f"Value '{cellvalue}' is invalid. Expected one of " f"'{', '.join(genocode.keys())}'.")),) return errors def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue: """Push the file error to redis a json string Parameters ---------- rconn: Connection to redis key: The name of the list where we push the errors error: The file error to save Returns ------- Returns the file error it saved """ if bool(error): rconn.rpush(key, json.dumps(error._asdict())) return error def file_errors_and_details( redisargs: dict[str, str], file_: Path, filetype: str, cdata: dict, linesplitterfn: Callable, linejoinerfn: Callable, headercheckers: tuple[Callable, ...], bodycheckers: tuple[Callable, ...] ) -> dict: """Compute errors, and other file metadata.""" errors = tuple() if cdata[f"{filetype}_transposed"]: rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: save_error_fn = partial(push_file_error_to_redis, rconn, error_list_name(filetype, file_.name)) for lineno, line in enumerate(open_file(file_), start=1): row = linesplitterfn(line) if lineno == 1: headers = tuple(row) errors = errors + reduce( lambda errs, fnct: errs + fnct( file_.name, row[1:], save_error_fn), headercheckers, tuple()) continue errors = errors + reduce( lambda errs, fnct: errs + fnct( file_.name, headers, row, cdata, save_error_fn), bodycheckers, tuple()) filedetails = { "filename": file_.name, "filesize": os.stat(file_).st_size, "linecount": lineno } rconn.hset(redisargs["fqjobid"], f"file-details:{filetype}:{file_.name}", json.dumps(filedetails)) return {**filedetails, "errors": errors} def error_list_name(filetype: str, filename: str): """Compute the name of the list where the errors will be pushed. Parameters ---------- filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES` filename: The name of the file. """ return f"errors:{filetype}:{filename}" def check_for_geno_errors( redisargs: dict[str, str], extractdir: Path, cdata: dict, linesplitterfn: Callable[[str], tuple[Union[str, None]]], linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], logger: Logger ) -> bool: """Check for errors in genotype files.""" if "geno" in cdata or "founder_geno" in cdata: genofiles = tuple( extractdir.joinpath(fname) for fname in cdata.get("geno", [])) fgenofiles = tuple( extractdir.joinpath(fname) for fname in cdata.get("founder_geno", [])) allgenofiles = genofiles + fgenofiles with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: error_list_names = [ error_list_name("geno", file_.name) for file_ in allgenofiles] for list_name in error_list_names: rconn.delete(list_name) rconn.hset( redisargs["fqjobid"], "geno-errors-lists", json.dumps(error_list_names)) processes = [ mproc.Process(target=file_errors_and_details, args=( redisargs, file_, ftype, cdata, linesplitterfn, linejoinerfn, (check_markers,), (check_geno_line,)) ) for ftype, file_ in ( tuple(("geno", file_) for file_ in genofiles) + tuple(("founder_geno", file_) for file_ in fgenofiles)) ] for process in processes: process.start() # Set expiry for any created error lists for key in error_list_names: rconn.expire(name=key, time=timedelta(seconds=redisargs["redisexpiry"])) # TOD0: Add the errors to redis if any(rconn.llen(errlst) > 0 for errlst in error_list_names): logger.error("At least one of the 'geno' files has (an) error(s).") return True logger.info("No error(s) found in any of the 'geno' files.") else: logger.info("No 'geno' files to check.") return False # def check_for_pheno_errors(...): # """Check for errors in phenotype files.""" # pass # def check_for_phenose_errors(...): # """Check for errors in phenotype, standard-error files.""" # pass # def check_for_phenocovar_errors(...): # """Check for errors in phenotype-covariates files.""" # pass def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int: """Run quality control checks on R/qtl2 bundles.""" thejob = parse_job(rconn, args.redisprefix, args.jobid) print(f"THE JOB =================> {thejob}") jobmeta = thejob["job-metadata"] inpath = Path(jobmeta["rqtl2-bundle-file"]) extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir") with ZipFile(inpath, "r") as zfile: rqtl2.extract(zfile, extractdir) ### BEGIN: The quality control checks ### cdata = rqtl2.control_data(extractdir) splitter = build_line_splitter(cdata) joiner = build_line_joiner(cdata) redisargs = { "fqjobid": fqjobid, "redisuri": args.redisuri, "redisexpiry": args.redisexpiry } check_for_missing_files(rconn, fqjobid, extractdir, logger) # check_for_pheno_errors(...) check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger) # check_for_phenose_errors(...) # check_for_phenocovar_errors(...) ### END: The quality control checks ### def __fetch_errors__(rkey: str) -> tuple: return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) return (1 if any(( bool(__fetch_errors__(key)) for key in ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) else 0) if __name__ == "__main__": def main(): """Enter R/qtl2 bundle QC runner.""" args = add_global_data_arguments(init_cli_parser( "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() check_redis(args.redisuri) check_db(args.databaseuri) logger = getLogger("qc-on-rqtl2-bundle") logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") fqjobid = jobs.job_key(args.redisprefix, args.jobid) with Redis.from_url(args.redisuri, decode_responses=True) as rconn: logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) exitcode = run_qc(rconn, args, fqjobid, logger) rconn.hset( jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) return exitcode sys.exit(main())