aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-08-21 19:53:18 -0500
committerFrederick Muriuki Muriithi2024-08-21 19:53:18 -0500
commit6a8b3755b6d25bb3e8a789877d98c83dca71532f (patch)
tree9056a2eab16cf20dc1aab7f243f00f2a75524651
parent0b6d924ad8615c2a6c9739c77f5001c96ea3553d (diff)
downloadgn-uploader-rewrite-rqtl2-qc.tar.gz
Save errors for each file in lists. Parallelise error checking.rewrite-rqtl2-qc
* Save the errors for each file in a redis list for that file. * Make error checking parallel, i.e. ensure every file of a particular type is checked completely independent of other files of the same type.
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py195
1 files changed, 138 insertions, 57 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
index f39689e..9136243 100644
--- a/scripts/qc_on_rqtl2_bundle2.py
+++ b/scripts/qc_on_rqtl2_bundle2.py
@@ -6,6 +6,7 @@ from time import sleep
from pathlib import Path
from zipfile import ZipFile
from argparse import Namespace
+from datetime import timedelta
import multiprocessing as mproc
from functools import reduce, partial
from logging import Logger, getLogger, StreamHandler
@@ -99,25 +100,30 @@ def open_file(file_: Path) -> Iterator:
yield line
-def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]:
+def check_markers(
+ filename: str,
+ row: tuple[str, ...],
+ save_error: lambda val: val
+) -> tuple[rqfe.InvalidValue]:
"""Check that the markers are okay"""
errors = tuple()
counts = {}
for marker in row:
counts = {**counts, marker: counts.get(marker, 0) + 1}
if marker is None or marker == "":
- errors = errors + (rqfe.InvalidValue(
+ errors = errors + (save_error(rqfe.InvalidValue(
filename,
"markers"
"-",
marker,
- "A marker MUST be a valid value."),)
+ "A marker MUST be a valid value.")),)
return errors + tuple(
- rqfe.InvalidValue(filename,
- "markers",
- key,
- f"Marker '{key}' was repeated {value} times")
+ save_error(rqfe.InvalidValue(
+ filename,
+ "markers",
+ key,
+ f"Marker '{key}' was repeated {value} times"))
for key,value in counts.items() if value > 1)
@@ -125,27 +131,28 @@ def check_geno_line(
filename: str,
headers: tuple[str, ...],
row: tuple[Union[str, None]],
- cdata: dict
+ cdata: dict,
+ save_error: lambda val: val
) -> tuple[rqfe.InvalidValue]:
"""Check that the geno line is correct."""
errors = tuple()
# Verify that line has same number of columns as headers
if len(headers) != len(row):
- errors = errors + (rqfe.InvalidValue(
+ errors = errors + (save_error(rqfe.InvalidValue(
filename,
headers[0],
row[0],
row[0],
- "Every line MUST have the same number of columns."),)
+ "Every line MUST have the same number of columns.")),)
# First column is the individuals/cases/samples
if not bool(row[0]):
- errors = errors + (rqfe.InvalidValue(
+ errors = errors + (save_error(rqfe.InvalidValue(
filename,
headers[0],
row[0],
row[0],
- "The sample/case MUST be a valid value."),)
+ "The sample/case MUST be a valid value.")),)
def __process_value__(val):
if val in cdata["na.strings"]:
@@ -160,15 +167,34 @@ def check_geno_line(
cellvalue is not None and
cellvalue not in genocode.keys()
):
- errors = errors + (rqfe.InvalidValue(
+ errors = errors + (save_error(rqfe.InvalidValue(
filename, row[0], coltitle, cellvalue,
f"Value '{cellvalue}' is invalid. Expected one of "
- f"'{', '.join(genocode.keys())}'."))
+ f"'{', '.join(genocode.keys())}'.")),)
return errors
+def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue:
+ """Push the file error to redis a json string
+
+ Parameters
+ ----------
+ rconn: Connection to redis
+ key: The name of the list where we push the errors
+ error: The file error to save
+
+ Returns
+ -------
+ Returns the file error it saved
+ """
+ if bool(error):
+ rconn.rpush(key, json.dumps(error._asdict()))
+ return error
+
+
def file_errors_and_details(
+ redisargs: dict[str, str],
file_: Path,
filetype: str,
cdata: dict,
@@ -182,55 +208,105 @@ def file_errors_and_details(
if cdata[f"{filetype}_transposed"]:
rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
- for lineno, line in enumerate(open_file(file_), start=1):
- row = linesplitterfn(line)
- if lineno == 1:
- headers = tuple(row)
+ with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+ save_error_fn = partial(push_file_error_to_redis,
+ rconn,
+ error_list_name(filetype, file_.name))
+ for lineno, line in enumerate(open_file(file_), start=1):
+ row = linesplitterfn(line)
+ if lineno == 1:
+ headers = tuple(row)
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(
+ file_.name, row[1:], save_error_fn),
+ headercheckers,
+ tuple())
+ continue
+
errors = errors + reduce(
- lambda errs, fnct: errs + fnct(file_.name, row[1:]),
- headercheckers,
+ lambda errs, fnct: errs + fnct(
+ file_.name, headers, row, cdata, save_error_fn),
+ bodycheckers,
tuple())
- continue
-
- errors = errors + reduce(
- lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata),
- bodycheckers,
- tuple())
-
- return {
- "filename": file_.name,
- "filesize": os.stat(file_).st_size,
- "linecount": lineno,
- # TOD0: Maybe put the errors in a redis list and return the name of the
- # redis list rather than the errors. Maybe also replace the errors
- # key with a flag e.g. "errors-found": True/False
- "errors": errors
- }
+
+ filedetails = {
+ "filename": file_.name,
+ "filesize": os.stat(file_).st_size,
+ "linecount": lineno
+ }
+ rconn.hset(redisargs["fqjobid"],
+ f"file-details:{filetype}:{file_.name}",
+ json.dumps(filedetails))
+ return {**filedetails, "errors": errors}
+
+
+def error_list_name(filetype: str, filename: str):
+ """Compute the name of the list where the errors will be pushed.
+
+ Parameters
+ ----------
+ filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES`
+ filename: The name of the file.
+ """
+ return f"errors:{filetype}:{filename}"
def check_for_geno_errors(
+ redisargs: dict[str, str],
extractdir: Path,
cdata: dict,
linesplitterfn: Callable[[str], tuple[Union[str, None]]],
linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
- logger: Logger) -> bool:
+ logger: Logger
+) -> bool:
"""Check for errors in genotype files."""
- if "geno" in cdata:
- genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"])
- ## Run code below in multiprocessing once you verify it works.
- gerrs = tuple(file_errors_and_details(
- file_,
- filetype="geno",
- cdata=cdata,
- linesplitterfn=linesplitterfn,
- linejoinerfn=linejoinerfn,
- headercheckers=(check_markers,),
- bodycheckers=(check_geno_line,)) for file_ in genofiles)
- # TOD0: Add the errors to redis
- if len(gerrs) > 0:
- logger.error("At least one of the 'geno' files has (an) error(s).")
- return True
- logger.info("No error(s) found in any of the 'geno' files.")
+ if "geno" in cdata or "founder_geno" in cdata:
+ genofiles = tuple(
+ extractdir.joinpath(fname) for fname in cdata.get("geno", []))
+ fgenofiles = tuple(
+ extractdir.joinpath(fname) for fname in cdata.get("founder_geno", []))
+ allgenofiles = genofiles + fgenofiles
+ with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+ error_list_names = [
+ error_list_name("geno", file_.name) for file_ in allgenofiles]
+ for list_name in error_list_names:
+ rconn.delete(list_name)
+ rconn.hset(
+ redisargs["fqjobid"],
+ "geno-errors-lists",
+ json.dumps(error_list_names))
+ processes = [
+ mproc.Process(target=file_errors_and_details,
+ args=(
+ redisargs,
+ file_,
+ ftype,
+ cdata,
+ linesplitterfn,
+ linejoinerfn,
+ (check_markers,),
+ (check_geno_line,))
+ )
+ for ftype, file_ in (
+ tuple(("geno", file_) for file_ in genofiles) +
+ tuple(("founder_geno", file_) for file_ in fgenofiles))
+ ]
+ for process in processes:
+ process.start()
+ # Set expiry for any created error lists
+ for key in error_list_names:
+ rconn.expire(name=key,
+ time=timedelta(seconds=redisargs["redisexpiry"]))
+
+ # TOD0: Add the errors to redis
+ if any(rconn.llen(errlst) > 0 for errlst in error_list_names):
+ logger.error("At least one of the 'geno' files has (an) error(s).")
+ return True
+ logger.info("No error(s) found in any of the 'geno' files.")
+
+ else:
+ logger.info("No 'geno' files to check.")
+
return False
@@ -249,9 +325,8 @@ def check_for_geno_errors(
# pass
-def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
+def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int:
"""Run quality control checks on R/qtl2 bundles."""
- fqjobid = jobs.job_key(args.redisprefix, args.jobid)
thejob = parse_job(rconn, args.redisprefix, args.jobid)
print(f"THE JOB =================> {thejob}")
jobmeta = thejob["job-metadata"]
@@ -264,9 +339,15 @@ def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
cdata = rqtl2.control_data(extractdir)
splitter = build_line_splitter(cdata)
joiner = build_line_joiner(cdata)
+
+ redisargs = {
+ "fqjobid": fqjobid,
+ "redisuri": args.redisuri,
+ "redisexpiry": args.redisexpiry
+ }
check_for_missing_files(rconn, fqjobid, extractdir, logger)
- check_for_geno_errors(extractdir, cdata, splitter, joiner, logger)
# check_for_pheno_errors(...)
+ check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger)
# check_for_phenose_errors(...)
# check_for_phenocovar_errors(...)
### END: The quality control checks ###
@@ -299,7 +380,7 @@ if __name__ == "__main__":
rconn, fqjobid, f"{fqjobid}:log-messages",
args.redisexpiry))
- exitcode = run_qc(rconn, args, logger)
+ exitcode = run_qc(rconn, args, fqjobid, logger)
rconn.hset(
jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
return exitcode