aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-08-12 16:26:28 -0500
committerFrederick Muriuki Muriithi2024-08-16 14:27:26 -0500
commit0b6d924ad8615c2a6c9739c77f5001c96ea3553d (patch)
tree9878a183af98c68452de7178762a5e8d15f00b79
parent5ee7e90844b16ff132278c6deb65a2b79ae95581 (diff)
downloadgn-uploader-0b6d924ad8615c2a6c9739c77f5001c96ea3553d.tar.gz
Rewrite the QC code for R/qtl2
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py307
1 files changed, 307 insertions, 0 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
new file mode 100644
index 0000000..f39689e
--- /dev/null
+++ b/scripts/qc_on_rqtl2_bundle2.py
@@ -0,0 +1,307 @@
+"""Run Quality Control checks on R/qtl2 bundle."""
+import os
+import sys
+import json
+from time import sleep
+from pathlib import Path
+from zipfile import ZipFile
+from argparse import Namespace
+import multiprocessing as mproc
+from functools import reduce, partial
+from logging import Logger, getLogger, StreamHandler
+from typing import Union, Sequence, Callable, Iterator
+
+import MySQLdb as mdb
+from redis import Redis
+
+from quality_control.errors import InvalidValue
+from quality_control.checks import decimal_points_error
+
+from uploader import jobs
+from uploader.db_utils import database_connection
+from uploader.check_connections import check_db, check_redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl import fileerrors as rqfe
+
+from scripts.process_rqtl2_bundle import parse_job
+from scripts.redis_logger import setup_redis_logger
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+
+
+def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]:
+ """Build and return a function to use to split data in the files.
+
+ Parameters
+ ----------
+ cdata: A dict holding the control information included with the R/qtl2
+ bundle.
+
+ Returns
+ -------
+ A function that takes a string and return a tuple of strings.
+ """
+ separator = cdata["sep"]
+ na_strings = cdata["na.strings"]
+ def __splitter__(line: str) -> tuple[Union[str, None], ...]:
+ return tuple(
+ item if item not in na_strings else None
+ for item in
+ (field.strip() for field in line.strip().split(separator)))
+ return __splitter__
+
+
+def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]:
+ """Build and return a function to use to split data in the files.
+
+ Parameters
+ ----------
+ cdata: A dict holding the control information included with the R/qtl2
+ bundle.
+
+ Returns
+ -------
+ A function that takes a string and return a tuple of strings.
+ """
+ separator = cdata["sep"]
+ na_strings = cdata["na.strings"]
+ def __joiner__(row: tuple[Union[str, None], ...]) -> str:
+ return separator.join(
+ (na_strings[0] if item is None else item)
+ for item in row)
+ return __joiner__
+
+
+def check_for_missing_files(
+ rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool:
+ """Check that all files listed in the control file do actually exist."""
+ logger.info("Checking for missing files.")
+ missing = rqc.missing_files(extractpath)
+ # add_to_errors(rconn, fqjobid, "errors-generic", tuple(
+ # rqfe.MissingFile(
+ # mfile[0], mfile[1], (
+ # f"File '{mfile[1]}' is listed in the control file under "
+ # f"the '{mfile[0]}' key, but it does not actually exist in "
+ # "the bundle."))
+ # for mfile in missing))
+ if len(missing) > 0:
+ logger.error(f"Missing files in the bundle!")
+ return True
+ return False
+
+
+def open_file(file_: Path) -> Iterator:
+ """Open file and return one line at a time."""
+ with open(file_, "r", encoding="utf8") as infile:
+ for line in infile:
+ yield line
+
+
+def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]:
+ """Check that the markers are okay"""
+ errors = tuple()
+ counts = {}
+ for marker in row:
+ counts = {**counts, marker: counts.get(marker, 0) + 1}
+ if marker is None or marker == "":
+ errors = errors + (rqfe.InvalidValue(
+ filename,
+ "markers"
+ "-",
+ marker,
+ "A marker MUST be a valid value."),)
+
+ return errors + tuple(
+ rqfe.InvalidValue(filename,
+ "markers",
+ key,
+ f"Marker '{key}' was repeated {value} times")
+ for key,value in counts.items() if value > 1)
+
+
+def check_geno_line(
+ filename: str,
+ headers: tuple[str, ...],
+ row: tuple[Union[str, None]],
+ cdata: dict
+) -> tuple[rqfe.InvalidValue]:
+ """Check that the geno line is correct."""
+ errors = tuple()
+ # Verify that line has same number of columns as headers
+ if len(headers) != len(row):
+ errors = errors + (rqfe.InvalidValue(
+ filename,
+ headers[0],
+ row[0],
+ row[0],
+ "Every line MUST have the same number of columns."),)
+
+ # First column is the individuals/cases/samples
+ if not bool(row[0]):
+ errors = errors + (rqfe.InvalidValue(
+ filename,
+ headers[0],
+ row[0],
+ row[0],
+ "The sample/case MUST be a valid value."),)
+
+ def __process_value__(val):
+ if val in cdata["na.strings"]:
+ return None
+ if val in cdata["alleles"]:
+ return cdata["genotypes"][val]
+
+ genocode = cdata.get("genotypes", {})
+ for coltitle, cellvalue in zip(headers[1:],row[1:]):
+ if (
+ bool(genocode) and
+ cellvalue is not None and
+ cellvalue not in genocode.keys()
+ ):
+ errors = errors + (rqfe.InvalidValue(
+ filename, row[0], coltitle, cellvalue,
+ f"Value '{cellvalue}' is invalid. Expected one of "
+ f"'{', '.join(genocode.keys())}'."))
+
+ return errors
+
+
+def file_errors_and_details(
+ file_: Path,
+ filetype: str,
+ cdata: dict,
+ linesplitterfn: Callable,
+ linejoinerfn: Callable,
+ headercheckers: tuple[Callable, ...],
+ bodycheckers: tuple[Callable, ...]
+) -> dict:
+ """Compute errors, and other file metadata."""
+ errors = tuple()
+ if cdata[f"{filetype}_transposed"]:
+ rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
+
+ for lineno, line in enumerate(open_file(file_), start=1):
+ row = linesplitterfn(line)
+ if lineno == 1:
+ headers = tuple(row)
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(file_.name, row[1:]),
+ headercheckers,
+ tuple())
+ continue
+
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata),
+ bodycheckers,
+ tuple())
+
+ return {
+ "filename": file_.name,
+ "filesize": os.stat(file_).st_size,
+ "linecount": lineno,
+ # TOD0: Maybe put the errors in a redis list and return the name of the
+ # redis list rather than the errors. Maybe also replace the errors
+ # key with a flag e.g. "errors-found": True/False
+ "errors": errors
+ }
+
+
+def check_for_geno_errors(
+ extractdir: Path,
+ cdata: dict,
+ linesplitterfn: Callable[[str], tuple[Union[str, None]]],
+ linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
+ logger: Logger) -> bool:
+ """Check for errors in genotype files."""
+ if "geno" in cdata:
+ genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"])
+ ## Run code below in multiprocessing once you verify it works.
+ gerrs = tuple(file_errors_and_details(
+ file_,
+ filetype="geno",
+ cdata=cdata,
+ linesplitterfn=linesplitterfn,
+ linejoinerfn=linejoinerfn,
+ headercheckers=(check_markers,),
+ bodycheckers=(check_geno_line,)) for file_ in genofiles)
+ # TOD0: Add the errors to redis
+ if len(gerrs) > 0:
+ logger.error("At least one of the 'geno' files has (an) error(s).")
+ return True
+ logger.info("No error(s) found in any of the 'geno' files.")
+ return False
+
+
+# def check_for_pheno_errors(...):
+# """Check for errors in phenotype files."""
+# pass
+
+
+# def check_for_phenose_errors(...):
+# """Check for errors in phenotype, standard-error files."""
+# pass
+
+
+# def check_for_phenocovar_errors(...):
+# """Check for errors in phenotype-covariates files."""
+# pass
+
+
+def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
+ """Run quality control checks on R/qtl2 bundles."""
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ thejob = parse_job(rconn, args.redisprefix, args.jobid)
+ print(f"THE JOB =================> {thejob}")
+ jobmeta = thejob["job-metadata"]
+ inpath = Path(jobmeta["rqtl2-bundle-file"])
+ extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir")
+ with ZipFile(inpath, "r") as zfile:
+ rqtl2.extract(zfile, extractdir)
+
+ ### BEGIN: The quality control checks ###
+ cdata = rqtl2.control_data(extractdir)
+ splitter = build_line_splitter(cdata)
+ joiner = build_line_joiner(cdata)
+ check_for_missing_files(rconn, fqjobid, extractdir, logger)
+ check_for_geno_errors(extractdir, cdata, splitter, joiner, logger)
+ # check_for_pheno_errors(...)
+ # check_for_phenose_errors(...)
+ # check_for_phenocovar_errors(...)
+ ### END: The quality control checks ###
+
+ def __fetch_errors__(rkey: str) -> tuple:
+ return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
+
+ return (1 if any((
+ bool(__fetch_errors__(key))
+ for key in
+ ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
+ else 0)
+
+
+if __name__ == "__main__":
+ def main():
+ """Enter R/qtl2 bundle QC runner."""
+ args = add_global_data_arguments(init_cli_parser(
+ "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
+ check_redis(args.redisuri)
+ check_db(args.databaseuri)
+
+ logger = getLogger("qc-on-rqtl2-bundle")
+ logger.addHandler(StreamHandler(stream=sys.stderr))
+ logger.setLevel("DEBUG")
+
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
+ logger.addHandler(setup_redis_logger(
+ rconn, fqjobid, f"{fqjobid}:log-messages",
+ args.redisexpiry))
+
+ exitcode = run_qc(rconn, args, logger)
+ rconn.hset(
+ jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
+ return exitcode
+
+ sys.exit(main())