about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py195
1 files changed, 138 insertions, 57 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
index f39689e..9136243 100644
--- a/scripts/qc_on_rqtl2_bundle2.py
+++ b/scripts/qc_on_rqtl2_bundle2.py
@@ -6,6 +6,7 @@ from time import sleep
 from pathlib import Path
 from zipfile import ZipFile
 from argparse import Namespace
+from datetime import timedelta
 import multiprocessing as mproc
 from functools import reduce, partial
 from logging import Logger, getLogger, StreamHandler
@@ -99,25 +100,30 @@ def open_file(file_: Path) -> Iterator:
             yield line
 
 
-def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]:
+def check_markers(
+        filename: str,
+        row: tuple[str, ...],
+        save_error: lambda val: val
+) -> tuple[rqfe.InvalidValue]:
     """Check that the markers are okay"""
     errors = tuple()
     counts = {}
     for marker in row:
         counts = {**counts, marker: counts.get(marker, 0) + 1}
         if marker is None or marker == "":
-            errors = errors + (rqfe.InvalidValue(
+            errors = errors + (save_error(rqfe.InvalidValue(
                 filename,
                 "markers"
                 "-",
                 marker,
-                "A marker MUST be a valid value."),)
+                "A marker MUST be a valid value.")),)
 
     return errors + tuple(
-        rqfe.InvalidValue(filename,
-                          "markers",
-                          key,
-                          f"Marker '{key}' was repeated {value} times")
+        save_error(rqfe.InvalidValue(
+            filename,
+            "markers",
+            key,
+            f"Marker '{key}' was repeated {value} times"))
         for key,value in counts.items() if value > 1)
 
 
@@ -125,27 +131,28 @@ def check_geno_line(
         filename: str,
         headers: tuple[str, ...],
         row: tuple[Union[str, None]],
-        cdata: dict
+        cdata: dict,
+        save_error: lambda val: val
 ) -> tuple[rqfe.InvalidValue]:
     """Check that the geno line is correct."""
     errors = tuple()
     # Verify that line has same number of columns as headers
     if len(headers) != len(row):
-        errors = errors + (rqfe.InvalidValue(
+        errors = errors + (save_error(rqfe.InvalidValue(
             filename,
             headers[0],
             row[0],
             row[0],
-            "Every line MUST have the same number of columns."),)
+            "Every line MUST have the same number of columns.")),)
 
     # First column is the individuals/cases/samples
     if not bool(row[0]):
-        errors = errors + (rqfe.InvalidValue(
+        errors = errors + (save_error(rqfe.InvalidValue(
             filename,
             headers[0],
             row[0],
             row[0],
-            "The sample/case MUST be a valid value."),)
+            "The sample/case MUST be a valid value.")),)
 
     def __process_value__(val):
         if val in cdata["na.strings"]:
@@ -160,15 +167,34 @@ def check_geno_line(
                 cellvalue is not None and
                 cellvalue not in genocode.keys()
         ):
-            errors = errors + (rqfe.InvalidValue(
+            errors = errors + (save_error(rqfe.InvalidValue(
                 filename, row[0], coltitle, cellvalue,
                 f"Value '{cellvalue}' is invalid. Expected one of "
-                f"'{', '.join(genocode.keys())}'."))
+                f"'{', '.join(genocode.keys())}'.")),)
 
     return errors
 
 
+def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue:
+    """Push the file error to redis a json string
+
+    Parameters
+    ----------
+    rconn: Connection to redis
+    key: The name of the list where we push the errors
+    error: The file error to save
+
+    Returns
+    -------
+    Returns the file error it saved
+    """
+    if bool(error):
+        rconn.rpush(key, json.dumps(error._asdict()))
+    return error
+
+
 def file_errors_and_details(
+        redisargs: dict[str, str],
         file_: Path,
         filetype: str,
         cdata: dict,
@@ -182,55 +208,105 @@ def file_errors_and_details(
     if cdata[f"{filetype}_transposed"]:
         rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
 
-    for lineno, line in enumerate(open_file(file_), start=1):
-        row = linesplitterfn(line)
-        if lineno == 1:
-            headers = tuple(row)
+    with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+        save_error_fn = partial(push_file_error_to_redis,
+                                rconn,
+                                error_list_name(filetype, file_.name))
+        for lineno, line in enumerate(open_file(file_), start=1):
+            row = linesplitterfn(line)
+            if lineno == 1:
+                headers = tuple(row)
+                errors = errors + reduce(
+                    lambda errs, fnct: errs + fnct(
+                        file_.name, row[1:], save_error_fn),
+                    headercheckers,
+                    tuple())
+                continue
+
             errors = errors + reduce(
-                lambda errs, fnct: errs + fnct(file_.name, row[1:]),
-                headercheckers,
+                lambda errs, fnct: errs + fnct(
+                    file_.name, headers, row, cdata, save_error_fn),
+                bodycheckers,
                 tuple())
-            continue
-
-        errors = errors + reduce(
-            lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata),
-            bodycheckers,
-            tuple())
-            
-    return {
-        "filename": file_.name,
-        "filesize": os.stat(file_).st_size,
-        "linecount": lineno,
-        # TOD0: Maybe put the errors in a redis list and return the name of the
-        #     redis list rather than the errors. Maybe also replace the errors
-        #     key with a flag e.g. "errors-found": True/False
-        "errors": errors
-    }
+
+        filedetails = {
+            "filename": file_.name,
+            "filesize": os.stat(file_).st_size,
+            "linecount": lineno
+        }
+        rconn.hset(redisargs["fqjobid"],
+                   f"file-details:{filetype}:{file_.name}",
+                   json.dumps(filedetails))
+        return {**filedetails, "errors": errors}
+
+
+def error_list_name(filetype: str, filename: str):
+    """Compute the name of the list where the errors will be pushed.
+
+    Parameters
+    ----------
+    filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES`
+    filename: The name of the file.
+    """
+    return f"errors:{filetype}:{filename}"
 
 
 def check_for_geno_errors(
+        redisargs: dict[str, str],
         extractdir: Path,
         cdata: dict,
         linesplitterfn: Callable[[str], tuple[Union[str, None]]],
         linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
-        logger: Logger) -> bool:
+        logger: Logger
+) -> bool:
     """Check for errors in genotype files."""
-    if "geno" in cdata:
-        genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"])
-        ## Run code below in multiprocessing once you verify it works.
-        gerrs = tuple(file_errors_and_details(
-            file_,
-            filetype="geno",
-            cdata=cdata,
-            linesplitterfn=linesplitterfn,
-            linejoinerfn=linejoinerfn,
-            headercheckers=(check_markers,),
-            bodycheckers=(check_geno_line,)) for file_ in genofiles)
-        # TOD0: Add the errors to redis
-        if len(gerrs) > 0:
-            logger.error("At least one of the 'geno' files has (an) error(s).")
-            return True
-        logger.info("No error(s) found in any of the 'geno' files.")
+    if "geno" in cdata or "founder_geno" in cdata:
+        genofiles = tuple(
+            extractdir.joinpath(fname) for fname in cdata.get("geno", []))
+        fgenofiles = tuple(
+            extractdir.joinpath(fname) for fname in cdata.get("founder_geno", []))
+        allgenofiles = genofiles + fgenofiles
+        with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+            error_list_names = [
+                error_list_name("geno", file_.name) for file_ in allgenofiles]
+            for list_name in error_list_names:
+                rconn.delete(list_name)
+            rconn.hset(
+                redisargs["fqjobid"],
+                "geno-errors-lists",
+                json.dumps(error_list_names))
+            processes = [
+                mproc.Process(target=file_errors_and_details,
+                              args=(
+                                  redisargs,
+                                  file_,
+                                  ftype,
+                                  cdata,
+                                  linesplitterfn,
+                                  linejoinerfn,
+                                  (check_markers,),
+                                  (check_geno_line,))
+                              )
+                for ftype, file_ in (
+                        tuple(("geno", file_) for file_ in genofiles) +
+                        tuple(("founder_geno", file_) for file_ in fgenofiles))
+            ]
+            for process in processes:
+                process.start()
+            # Set expiry for any created error lists
+            for key in error_list_names:
+                rconn.expire(name=key,
+                             time=timedelta(seconds=redisargs["redisexpiry"]))
+
+            # TOD0: Add the errors to redis
+            if any(rconn.llen(errlst) > 0 for errlst in error_list_names):
+                logger.error("At least one of the 'geno' files has (an) error(s).")
+                return True
+            logger.info("No error(s) found in any of the 'geno' files.")
+
+    else:
+        logger.info("No 'geno' files to check.")
+
     return False
 
 
@@ -249,9 +325,8 @@ def check_for_geno_errors(
 #     pass
 
 
-def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
+def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int:
     """Run quality control checks on R/qtl2 bundles."""
-    fqjobid = jobs.job_key(args.redisprefix, args.jobid)
     thejob = parse_job(rconn, args.redisprefix, args.jobid)
     print(f"THE JOB =================> {thejob}")
     jobmeta = thejob["job-metadata"]
@@ -264,9 +339,15 @@ def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
     cdata = rqtl2.control_data(extractdir)
     splitter = build_line_splitter(cdata)
     joiner = build_line_joiner(cdata)
+
+    redisargs = {
+        "fqjobid": fqjobid,
+        "redisuri": args.redisuri,
+        "redisexpiry": args.redisexpiry
+    }
     check_for_missing_files(rconn, fqjobid, extractdir, logger)
-    check_for_geno_errors(extractdir, cdata, splitter, joiner, logger)
     # check_for_pheno_errors(...)
+    check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger)
     # check_for_phenose_errors(...)
     # check_for_phenocovar_errors(...)
     ### END: The quality control checks ###
@@ -299,7 +380,7 @@ if __name__ == "__main__":
                 rconn, fqjobid, f"{fqjobid}:log-messages",
                 args.redisexpiry))
 
-            exitcode = run_qc(rconn, args, logger)
+            exitcode = run_qc(rconn, args, fqjobid, logger)
             rconn.hset(
                 jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
             return exitcode