From 6a8b3755b6d25bb3e8a789877d98c83dca71532f Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 21 Aug 2024 19:53:18 -0500 Subject: Save errors for each file in lists. Parallelise error checking. * Save the errors for each file in a redis list for that file. * Make error checking parallel, i.e. ensure every file of a particular type is checked completely independent of other files of the same type. --- scripts/qc_on_rqtl2_bundle2.py | 195 +++++++++++++++++++++++++++++------------ 1 file changed, 138 insertions(+), 57 deletions(-) diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py index f39689e..9136243 100644 --- a/scripts/qc_on_rqtl2_bundle2.py +++ b/scripts/qc_on_rqtl2_bundle2.py @@ -6,6 +6,7 @@ from time import sleep from pathlib import Path from zipfile import ZipFile from argparse import Namespace +from datetime import timedelta import multiprocessing as mproc from functools import reduce, partial from logging import Logger, getLogger, StreamHandler @@ -99,25 +100,30 @@ def open_file(file_: Path) -> Iterator: yield line -def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]: +def check_markers( + filename: str, + row: tuple[str, ...], + save_error: lambda val: val +) -> tuple[rqfe.InvalidValue]: """Check that the markers are okay""" errors = tuple() counts = {} for marker in row: counts = {**counts, marker: counts.get(marker, 0) + 1} if marker is None or marker == "": - errors = errors + (rqfe.InvalidValue( + errors = errors + (save_error(rqfe.InvalidValue( filename, "markers" "-", marker, - "A marker MUST be a valid value."),) + "A marker MUST be a valid value.")),) return errors + tuple( - rqfe.InvalidValue(filename, - "markers", - key, - f"Marker '{key}' was repeated {value} times") + save_error(rqfe.InvalidValue( + filename, + "markers", + key, + f"Marker '{key}' was repeated {value} times")) for key,value in counts.items() if value > 1) @@ -125,27 +131,28 @@ def check_geno_line( filename: str, headers: tuple[str, ...], row: tuple[Union[str, None]], - cdata: dict + cdata: dict, + save_error: lambda val: val ) -> tuple[rqfe.InvalidValue]: """Check that the geno line is correct.""" errors = tuple() # Verify that line has same number of columns as headers if len(headers) != len(row): - errors = errors + (rqfe.InvalidValue( + errors = errors + (save_error(rqfe.InvalidValue( filename, headers[0], row[0], row[0], - "Every line MUST have the same number of columns."),) + "Every line MUST have the same number of columns.")),) # First column is the individuals/cases/samples if not bool(row[0]): - errors = errors + (rqfe.InvalidValue( + errors = errors + (save_error(rqfe.InvalidValue( filename, headers[0], row[0], row[0], - "The sample/case MUST be a valid value."),) + "The sample/case MUST be a valid value.")),) def __process_value__(val): if val in cdata["na.strings"]: @@ -160,15 +167,34 @@ def check_geno_line( cellvalue is not None and cellvalue not in genocode.keys() ): - errors = errors + (rqfe.InvalidValue( + errors = errors + (save_error(rqfe.InvalidValue( filename, row[0], coltitle, cellvalue, f"Value '{cellvalue}' is invalid. Expected one of " - f"'{', '.join(genocode.keys())}'.")) + f"'{', '.join(genocode.keys())}'.")),) return errors +def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue: + """Push the file error to redis a json string + + Parameters + ---------- + rconn: Connection to redis + key: The name of the list where we push the errors + error: The file error to save + + Returns + ------- + Returns the file error it saved + """ + if bool(error): + rconn.rpush(key, json.dumps(error._asdict())) + return error + + def file_errors_and_details( + redisargs: dict[str, str], file_: Path, filetype: str, cdata: dict, @@ -182,55 +208,105 @@ def file_errors_and_details( if cdata[f"{filetype}_transposed"]: rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) - for lineno, line in enumerate(open_file(file_), start=1): - row = linesplitterfn(line) - if lineno == 1: - headers = tuple(row) + with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: + save_error_fn = partial(push_file_error_to_redis, + rconn, + error_list_name(filetype, file_.name)) + for lineno, line in enumerate(open_file(file_), start=1): + row = linesplitterfn(line) + if lineno == 1: + headers = tuple(row) + errors = errors + reduce( + lambda errs, fnct: errs + fnct( + file_.name, row[1:], save_error_fn), + headercheckers, + tuple()) + continue + errors = errors + reduce( - lambda errs, fnct: errs + fnct(file_.name, row[1:]), - headercheckers, + lambda errs, fnct: errs + fnct( + file_.name, headers, row, cdata, save_error_fn), + bodycheckers, tuple()) - continue - - errors = errors + reduce( - lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata), - bodycheckers, - tuple()) - - return { - "filename": file_.name, - "filesize": os.stat(file_).st_size, - "linecount": lineno, - # TOD0: Maybe put the errors in a redis list and return the name of the - # redis list rather than the errors. Maybe also replace the errors - # key with a flag e.g. "errors-found": True/False - "errors": errors - } + + filedetails = { + "filename": file_.name, + "filesize": os.stat(file_).st_size, + "linecount": lineno + } + rconn.hset(redisargs["fqjobid"], + f"file-details:{filetype}:{file_.name}", + json.dumps(filedetails)) + return {**filedetails, "errors": errors} + + +def error_list_name(filetype: str, filename: str): + """Compute the name of the list where the errors will be pushed. + + Parameters + ---------- + filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES` + filename: The name of the file. + """ + return f"errors:{filetype}:{filename}" def check_for_geno_errors( + redisargs: dict[str, str], extractdir: Path, cdata: dict, linesplitterfn: Callable[[str], tuple[Union[str, None]]], linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], - logger: Logger) -> bool: + logger: Logger +) -> bool: """Check for errors in genotype files.""" - if "geno" in cdata: - genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"]) - ## Run code below in multiprocessing once you verify it works. - gerrs = tuple(file_errors_and_details( - file_, - filetype="geno", - cdata=cdata, - linesplitterfn=linesplitterfn, - linejoinerfn=linejoinerfn, - headercheckers=(check_markers,), - bodycheckers=(check_geno_line,)) for file_ in genofiles) - # TOD0: Add the errors to redis - if len(gerrs) > 0: - logger.error("At least one of the 'geno' files has (an) error(s).") - return True - logger.info("No error(s) found in any of the 'geno' files.") + if "geno" in cdata or "founder_geno" in cdata: + genofiles = tuple( + extractdir.joinpath(fname) for fname in cdata.get("geno", [])) + fgenofiles = tuple( + extractdir.joinpath(fname) for fname in cdata.get("founder_geno", [])) + allgenofiles = genofiles + fgenofiles + with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: + error_list_names = [ + error_list_name("geno", file_.name) for file_ in allgenofiles] + for list_name in error_list_names: + rconn.delete(list_name) + rconn.hset( + redisargs["fqjobid"], + "geno-errors-lists", + json.dumps(error_list_names)) + processes = [ + mproc.Process(target=file_errors_and_details, + args=( + redisargs, + file_, + ftype, + cdata, + linesplitterfn, + linejoinerfn, + (check_markers,), + (check_geno_line,)) + ) + for ftype, file_ in ( + tuple(("geno", file_) for file_ in genofiles) + + tuple(("founder_geno", file_) for file_ in fgenofiles)) + ] + for process in processes: + process.start() + # Set expiry for any created error lists + for key in error_list_names: + rconn.expire(name=key, + time=timedelta(seconds=redisargs["redisexpiry"])) + + # TOD0: Add the errors to redis + if any(rconn.llen(errlst) > 0 for errlst in error_list_names): + logger.error("At least one of the 'geno' files has (an) error(s).") + return True + logger.info("No error(s) found in any of the 'geno' files.") + + else: + logger.info("No 'geno' files to check.") + return False @@ -249,9 +325,8 @@ def check_for_geno_errors( # pass -def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int: +def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int: """Run quality control checks on R/qtl2 bundles.""" - fqjobid = jobs.job_key(args.redisprefix, args.jobid) thejob = parse_job(rconn, args.redisprefix, args.jobid) print(f"THE JOB =================> {thejob}") jobmeta = thejob["job-metadata"] @@ -264,9 +339,15 @@ def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int: cdata = rqtl2.control_data(extractdir) splitter = build_line_splitter(cdata) joiner = build_line_joiner(cdata) + + redisargs = { + "fqjobid": fqjobid, + "redisuri": args.redisuri, + "redisexpiry": args.redisexpiry + } check_for_missing_files(rconn, fqjobid, extractdir, logger) - check_for_geno_errors(extractdir, cdata, splitter, joiner, logger) # check_for_pheno_errors(...) + check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger) # check_for_phenose_errors(...) # check_for_phenocovar_errors(...) ### END: The quality control checks ### @@ -299,7 +380,7 @@ if __name__ == "__main__": rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) - exitcode = run_qc(rconn, args, logger) + exitcode = run_qc(rconn, args, fqjobid, logger) rconn.hset( jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) return exitcode -- cgit v1.2.3