diff options
Diffstat (limited to 'scripts/qc_on_rqtl2_bundle2.py')
-rw-r--r-- | scripts/qc_on_rqtl2_bundle2.py | 346 |
1 files changed, 0 insertions, 346 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py deleted file mode 100644 index 7e5d253..0000000 --- a/scripts/qc_on_rqtl2_bundle2.py +++ /dev/null @@ -1,346 +0,0 @@ -"""Run Quality Control checks on R/qtl2 bundle.""" -import os -import sys -import json -from time import sleep -from pathlib import Path -from zipfile import ZipFile -from argparse import Namespace -from datetime import timedelta -import multiprocessing as mproc -from functools import reduce, partial -from logging import Logger, getLogger, StreamHandler -from typing import Union, Sequence, Callable, Iterator - -import MySQLdb as mdb -from redis import Redis - -from quality_control.errors import InvalidValue -from quality_control.checks import decimal_points_error - -from uploader import jobs -from uploader.db_utils import database_connection -from uploader.check_connections import check_db, check_redis - -from r_qtl import r_qtl2 as rqtl2 -from r_qtl import r_qtl2_qc as rqc -from r_qtl import exceptions as rqe -from r_qtl import fileerrors as rqfe - -from scripts.process_rqtl2_bundle import parse_job -from scripts.redis_logger import setup_redis_logger -from scripts.cli_parser import init_cli_parser, add_global_data_arguments -from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter - - -def check_for_missing_files( - rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool: - """Check that all files listed in the control file do actually exist.""" - logger.info("Checking for missing files.") - missing = rqc.missing_files(extractpath) - # add_to_errors(rconn, fqjobid, "errors-generic", tuple( - # rqfe.MissingFile( - # mfile[0], mfile[1], ( - # f"File '{mfile[1]}' is listed in the control file under " - # f"the '{mfile[0]}' key, but it does not actually exist in " - # "the bundle.")) - # for mfile in missing)) - if len(missing) > 0: - logger.error(f"Missing files in the bundle!") - return True - return False - - -def open_file(file_: Path) -> Iterator: - """Open file and return one line at a time.""" - with open(file_, "r", encoding="utf8") as infile: - for line in infile: - yield line - - -def check_markers( - filename: str, - row: tuple[str, ...], - save_error: lambda val: val -) -> tuple[rqfe.InvalidValue]: - """Check that the markers are okay""" - errors = tuple() - counts = {} - for marker in row: - counts = {**counts, marker: counts.get(marker, 0) + 1} - if marker is None or marker == "": - errors = errors + (save_error(rqfe.InvalidValue( - filename, - "markers" - "-", - marker, - "A marker MUST be a valid value.")),) - - return errors + tuple( - save_error(rqfe.InvalidValue( - filename, - "markers", - key, - f"Marker '{key}' was repeated {value} times")) - for key,value in counts.items() if value > 1) - - -def check_geno_line( - filename: str, - headers: tuple[str, ...], - row: tuple[Union[str, None]], - cdata: dict, - save_error: lambda val: val -) -> tuple[rqfe.InvalidValue]: - """Check that the geno line is correct.""" - errors = tuple() - # Verify that line has same number of columns as headers - if len(headers) != len(row): - errors = errors + (save_error(rqfe.InvalidValue( - filename, - headers[0], - row[0], - row[0], - "Every line MUST have the same number of columns.")),) - - # First column is the individuals/cases/samples - if not bool(row[0]): - errors = errors + (save_error(rqfe.InvalidValue( - filename, - headers[0], - row[0], - row[0], - "The sample/case MUST be a valid value.")),) - - def __process_value__(val): - if val in cdata["na.strings"]: - return None - if val in cdata["alleles"]: - return cdata["genotypes"][val] - - genocode = cdata.get("genotypes", {}) - for coltitle, cellvalue in zip(headers[1:],row[1:]): - if ( - bool(genocode) and - cellvalue is not None and - cellvalue not in genocode.keys() - ): - errors = errors + (save_error(rqfe.InvalidValue( - filename, row[0], coltitle, cellvalue, - f"Value '{cellvalue}' is invalid. Expected one of " - f"'{', '.join(genocode.keys())}'.")),) - - return errors - - -def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue: - """Push the file error to redis a json string - - Parameters - ---------- - rconn: Connection to redis - key: The name of the list where we push the errors - error: The file error to save - - Returns - ------- - Returns the file error it saved - """ - if bool(error): - rconn.rpush(key, json.dumps(error._asdict())) - return error - - -def file_errors_and_details( - redisargs: dict[str, str], - file_: Path, - filetype: str, - cdata: dict, - linesplitterfn: Callable, - linejoinerfn: Callable, - headercheckers: tuple[Callable, ...], - bodycheckers: tuple[Callable, ...] -) -> dict: - """Compute errors, and other file metadata.""" - errors = tuple() - if cdata[f"{filetype}_transposed"]: - rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) - - with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: - save_error_fn = partial(push_file_error_to_redis, - rconn, - error_list_name(filetype, file_.name)) - for lineno, line in enumerate(open_file(file_), start=1): - row = linesplitterfn(line) - if lineno == 1: - headers = tuple(row) - errors = errors + reduce( - lambda errs, fnct: errs + fnct( - file_.name, row[1:], save_error_fn), - headercheckers, - tuple()) - continue - - errors = errors + reduce( - lambda errs, fnct: errs + fnct( - file_.name, headers, row, cdata, save_error_fn), - bodycheckers, - tuple()) - - filedetails = { - "filename": file_.name, - "filesize": os.stat(file_).st_size, - "linecount": lineno - } - rconn.hset(redisargs["fqjobid"], - f"file-details:{filetype}:{file_.name}", - json.dumps(filedetails)) - return {**filedetails, "errors": errors} - - -def error_list_name(filetype: str, filename: str): - """Compute the name of the list where the errors will be pushed. - - Parameters - ---------- - filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES` - filename: The name of the file. - """ - return f"errors:{filetype}:{filename}" - - -def check_for_geno_errors( - redisargs: dict[str, str], - extractdir: Path, - cdata: dict, - linesplitterfn: Callable[[str], tuple[Union[str, None]]], - linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], - logger: Logger -) -> bool: - """Check for errors in genotype files.""" - if "geno" in cdata or "founder_geno" in cdata: - genofiles = tuple( - extractdir.joinpath(fname) for fname in cdata.get("geno", [])) - fgenofiles = tuple( - extractdir.joinpath(fname) for fname in cdata.get("founder_geno", [])) - allgenofiles = genofiles + fgenofiles - with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: - error_list_names = [ - error_list_name("geno", file_.name) for file_ in allgenofiles] - for list_name in error_list_names: - rconn.delete(list_name) - rconn.hset( - redisargs["fqjobid"], - "geno-errors-lists", - json.dumps(error_list_names)) - processes = [ - mproc.Process(target=file_errors_and_details, - args=( - redisargs, - file_, - ftype, - cdata, - linesplitterfn, - linejoinerfn, - (check_markers,), - (check_geno_line,)) - ) - for ftype, file_ in ( - tuple(("geno", file_) for file_ in genofiles) + - tuple(("founder_geno", file_) for file_ in fgenofiles)) - ] - for process in processes: - process.start() - # Set expiry for any created error lists - for key in error_list_names: - rconn.expire(name=key, - time=timedelta(seconds=redisargs["redisexpiry"])) - - # TOD0: Add the errors to redis - if any(rconn.llen(errlst) > 0 for errlst in error_list_names): - logger.error("At least one of the 'geno' files has (an) error(s).") - return True - logger.info("No error(s) found in any of the 'geno' files.") - - else: - logger.info("No 'geno' files to check.") - - return False - - -# def check_for_pheno_errors(...): -# """Check for errors in phenotype files.""" -# pass - - -# def check_for_phenose_errors(...): -# """Check for errors in phenotype, standard-error files.""" -# pass - - -# def check_for_phenocovar_errors(...): -# """Check for errors in phenotype-covariates files.""" -# pass - - -def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int: - """Run quality control checks on R/qtl2 bundles.""" - thejob = parse_job(rconn, args.redisprefix, args.jobid) - print(f"THE JOB =================> {thejob}") - jobmeta = thejob["job-metadata"] - inpath = Path(jobmeta["rqtl2-bundle-file"]) - extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir") - with ZipFile(inpath, "r") as zfile: - rqtl2.extract(zfile, extractdir) - - ### BEGIN: The quality control checks ### - cdata = rqtl2.control_data(extractdir) - splitter = build_line_splitter(cdata) - joiner = build_line_joiner(cdata) - - redisargs = { - "fqjobid": fqjobid, - "redisuri": args.redisuri, - "redisexpiry": args.redisexpiry - } - check_for_missing_files(rconn, fqjobid, extractdir, logger) - # check_for_pheno_errors(...) - check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger) - # check_for_phenose_errors(...) - # check_for_phenocovar_errors(...) - ### END: The quality control checks ### - - def __fetch_errors__(rkey: str) -> tuple: - return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) - - return (1 if any(( - bool(__fetch_errors__(key)) - for key in - ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) - else 0) - - -if __name__ == "__main__": - def main(): - """Enter R/qtl2 bundle QC runner.""" - args = add_global_data_arguments(init_cli_parser( - "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() - check_redis(args.redisuri) - check_db(args.databaseuri) - - logger = getLogger("qc-on-rqtl2-bundle") - logger.addHandler(StreamHandler(stream=sys.stderr)) - logger.setLevel("DEBUG") - - fqjobid = jobs.job_key(args.redisprefix, args.jobid) - with Redis.from_url(args.redisuri, decode_responses=True) as rconn: - logger.addHandler(setup_redis_logger( - rconn, fqjobid, f"{fqjobid}:log-messages", - args.redisexpiry)) - - exitcode = run_qc(rconn, args, fqjobid, logger) - rconn.hset( - jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) - return exitcode - - sys.exit(main()) |