aboutsummaryrefslogtreecommitdiff
path: root/scripts/qc_on_rqtl2_bundle2.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/qc_on_rqtl2_bundle2.py')
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py346
1 files changed, 346 insertions, 0 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
new file mode 100644
index 0000000..7e5d253
--- /dev/null
+++ b/scripts/qc_on_rqtl2_bundle2.py
@@ -0,0 +1,346 @@
+"""Run Quality Control checks on R/qtl2 bundle."""
+import os
+import sys
+import json
+from time import sleep
+from pathlib import Path
+from zipfile import ZipFile
+from argparse import Namespace
+from datetime import timedelta
+import multiprocessing as mproc
+from functools import reduce, partial
+from logging import Logger, getLogger, StreamHandler
+from typing import Union, Sequence, Callable, Iterator
+
+import MySQLdb as mdb
+from redis import Redis
+
+from quality_control.errors import InvalidValue
+from quality_control.checks import decimal_points_error
+
+from uploader import jobs
+from uploader.db_utils import database_connection
+from uploader.check_connections import check_db, check_redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl import fileerrors as rqfe
+
+from scripts.process_rqtl2_bundle import parse_job
+from scripts.redis_logger import setup_redis_logger
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+
+def check_for_missing_files(
+ rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool:
+ """Check that all files listed in the control file do actually exist."""
+ logger.info("Checking for missing files.")
+ missing = rqc.missing_files(extractpath)
+ # add_to_errors(rconn, fqjobid, "errors-generic", tuple(
+ # rqfe.MissingFile(
+ # mfile[0], mfile[1], (
+ # f"File '{mfile[1]}' is listed in the control file under "
+ # f"the '{mfile[0]}' key, but it does not actually exist in "
+ # "the bundle."))
+ # for mfile in missing))
+ if len(missing) > 0:
+ logger.error(f"Missing files in the bundle!")
+ return True
+ return False
+
+
+def open_file(file_: Path) -> Iterator:
+ """Open file and return one line at a time."""
+ with open(file_, "r", encoding="utf8") as infile:
+ for line in infile:
+ yield line
+
+
+def check_markers(
+ filename: str,
+ row: tuple[str, ...],
+ save_error: lambda val: val
+) -> tuple[rqfe.InvalidValue]:
+ """Check that the markers are okay"""
+ errors = tuple()
+ counts = {}
+ for marker in row:
+ counts = {**counts, marker: counts.get(marker, 0) + 1}
+ if marker is None or marker == "":
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename,
+ "markers"
+ "-",
+ marker,
+ "A marker MUST be a valid value.")),)
+
+ return errors + tuple(
+ save_error(rqfe.InvalidValue(
+ filename,
+ "markers",
+ key,
+ f"Marker '{key}' was repeated {value} times"))
+ for key,value in counts.items() if value > 1)
+
+
+def check_geno_line(
+ filename: str,
+ headers: tuple[str, ...],
+ row: tuple[Union[str, None]],
+ cdata: dict,
+ save_error: lambda val: val
+) -> tuple[rqfe.InvalidValue]:
+ """Check that the geno line is correct."""
+ errors = tuple()
+ # Verify that line has same number of columns as headers
+ if len(headers) != len(row):
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename,
+ headers[0],
+ row[0],
+ row[0],
+ "Every line MUST have the same number of columns.")),)
+
+ # First column is the individuals/cases/samples
+ if not bool(row[0]):
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename,
+ headers[0],
+ row[0],
+ row[0],
+ "The sample/case MUST be a valid value.")),)
+
+ def __process_value__(val):
+ if val in cdata["na.strings"]:
+ return None
+ if val in cdata["alleles"]:
+ return cdata["genotypes"][val]
+
+ genocode = cdata.get("genotypes", {})
+ for coltitle, cellvalue in zip(headers[1:],row[1:]):
+ if (
+ bool(genocode) and
+ cellvalue is not None and
+ cellvalue not in genocode.keys()
+ ):
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename, row[0], coltitle, cellvalue,
+ f"Value '{cellvalue}' is invalid. Expected one of "
+ f"'{', '.join(genocode.keys())}'.")),)
+
+ return errors
+
+
+def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue:
+ """Push the file error to redis a json string
+
+ Parameters
+ ----------
+ rconn: Connection to redis
+ key: The name of the list where we push the errors
+ error: The file error to save
+
+ Returns
+ -------
+ Returns the file error it saved
+ """
+ if bool(error):
+ rconn.rpush(key, json.dumps(error._asdict()))
+ return error
+
+
+def file_errors_and_details(
+ redisargs: dict[str, str],
+ file_: Path,
+ filetype: str,
+ cdata: dict,
+ linesplitterfn: Callable,
+ linejoinerfn: Callable,
+ headercheckers: tuple[Callable, ...],
+ bodycheckers: tuple[Callable, ...]
+) -> dict:
+ """Compute errors, and other file metadata."""
+ errors = tuple()
+ if cdata[f"{filetype}_transposed"]:
+ rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
+
+ with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+ save_error_fn = partial(push_file_error_to_redis,
+ rconn,
+ error_list_name(filetype, file_.name))
+ for lineno, line in enumerate(open_file(file_), start=1):
+ row = linesplitterfn(line)
+ if lineno == 1:
+ headers = tuple(row)
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(
+ file_.name, row[1:], save_error_fn),
+ headercheckers,
+ tuple())
+ continue
+
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(
+ file_.name, headers, row, cdata, save_error_fn),
+ bodycheckers,
+ tuple())
+
+ filedetails = {
+ "filename": file_.name,
+ "filesize": os.stat(file_).st_size,
+ "linecount": lineno
+ }
+ rconn.hset(redisargs["fqjobid"],
+ f"file-details:{filetype}:{file_.name}",
+ json.dumps(filedetails))
+ return {**filedetails, "errors": errors}
+
+
+def error_list_name(filetype: str, filename: str):
+ """Compute the name of the list where the errors will be pushed.
+
+ Parameters
+ ----------
+ filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES`
+ filename: The name of the file.
+ """
+ return f"errors:{filetype}:{filename}"
+
+
+def check_for_geno_errors(
+ redisargs: dict[str, str],
+ extractdir: Path,
+ cdata: dict,
+ linesplitterfn: Callable[[str], tuple[Union[str, None]]],
+ linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
+ logger: Logger
+) -> bool:
+ """Check for errors in genotype files."""
+ if "geno" in cdata or "founder_geno" in cdata:
+ genofiles = tuple(
+ extractdir.joinpath(fname) for fname in cdata.get("geno", []))
+ fgenofiles = tuple(
+ extractdir.joinpath(fname) for fname in cdata.get("founder_geno", []))
+ allgenofiles = genofiles + fgenofiles
+ with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+ error_list_names = [
+ error_list_name("geno", file_.name) for file_ in allgenofiles]
+ for list_name in error_list_names:
+ rconn.delete(list_name)
+ rconn.hset(
+ redisargs["fqjobid"],
+ "geno-errors-lists",
+ json.dumps(error_list_names))
+ processes = [
+ mproc.Process(target=file_errors_and_details,
+ args=(
+ redisargs,
+ file_,
+ ftype,
+ cdata,
+ linesplitterfn,
+ linejoinerfn,
+ (check_markers,),
+ (check_geno_line,))
+ )
+ for ftype, file_ in (
+ tuple(("geno", file_) for file_ in genofiles) +
+ tuple(("founder_geno", file_) for file_ in fgenofiles))
+ ]
+ for process in processes:
+ process.start()
+ # Set expiry for any created error lists
+ for key in error_list_names:
+ rconn.expire(name=key,
+ time=timedelta(seconds=redisargs["redisexpiry"]))
+
+ # TOD0: Add the errors to redis
+ if any(rconn.llen(errlst) > 0 for errlst in error_list_names):
+ logger.error("At least one of the 'geno' files has (an) error(s).")
+ return True
+ logger.info("No error(s) found in any of the 'geno' files.")
+
+ else:
+ logger.info("No 'geno' files to check.")
+
+ return False
+
+
+# def check_for_pheno_errors(...):
+# """Check for errors in phenotype files."""
+# pass
+
+
+# def check_for_phenose_errors(...):
+# """Check for errors in phenotype, standard-error files."""
+# pass
+
+
+# def check_for_phenocovar_errors(...):
+# """Check for errors in phenotype-covariates files."""
+# pass
+
+
+def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int:
+ """Run quality control checks on R/qtl2 bundles."""
+ thejob = parse_job(rconn, args.redisprefix, args.jobid)
+ print(f"THE JOB =================> {thejob}")
+ jobmeta = thejob["job-metadata"]
+ inpath = Path(jobmeta["rqtl2-bundle-file"])
+ extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir")
+ with ZipFile(inpath, "r") as zfile:
+ rqtl2.extract(zfile, extractdir)
+
+ ### BEGIN: The quality control checks ###
+ cdata = rqtl2.control_data(extractdir)
+ splitter = build_line_splitter(cdata)
+ joiner = build_line_joiner(cdata)
+
+ redisargs = {
+ "fqjobid": fqjobid,
+ "redisuri": args.redisuri,
+ "redisexpiry": args.redisexpiry
+ }
+ check_for_missing_files(rconn, fqjobid, extractdir, logger)
+ # check_for_pheno_errors(...)
+ check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger)
+ # check_for_phenose_errors(...)
+ # check_for_phenocovar_errors(...)
+ ### END: The quality control checks ###
+
+ def __fetch_errors__(rkey: str) -> tuple:
+ return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
+
+ return (1 if any((
+ bool(__fetch_errors__(key))
+ for key in
+ ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
+ else 0)
+
+
+if __name__ == "__main__":
+ def main():
+ """Enter R/qtl2 bundle QC runner."""
+ args = add_global_data_arguments(init_cli_parser(
+ "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
+ check_redis(args.redisuri)
+ check_db(args.databaseuri)
+
+ logger = getLogger("qc-on-rqtl2-bundle")
+ logger.addHandler(StreamHandler(stream=sys.stderr))
+ logger.setLevel("DEBUG")
+
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
+ logger.addHandler(setup_redis_logger(
+ rconn, fqjobid, f"{fqjobid}:log-messages",
+ args.redisexpiry))
+
+ exitcode = run_qc(rconn, args, fqjobid, logger)
+ rconn.hset(
+ jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
+ return exitcode
+
+ sys.exit(main())