aboutsummaryrefslogtreecommitdiff
path: root/scripts/qc_on_rqtl2_bundle2.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/qc_on_rqtl2_bundle2.py')
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py346
1 files changed, 0 insertions, 346 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
deleted file mode 100644
index 7e5d253..0000000
--- a/scripts/qc_on_rqtl2_bundle2.py
+++ /dev/null
@@ -1,346 +0,0 @@
-"""Run Quality Control checks on R/qtl2 bundle."""
-import os
-import sys
-import json
-from time import sleep
-from pathlib import Path
-from zipfile import ZipFile
-from argparse import Namespace
-from datetime import timedelta
-import multiprocessing as mproc
-from functools import reduce, partial
-from logging import Logger, getLogger, StreamHandler
-from typing import Union, Sequence, Callable, Iterator
-
-import MySQLdb as mdb
-from redis import Redis
-
-from quality_control.errors import InvalidValue
-from quality_control.checks import decimal_points_error
-
-from uploader import jobs
-from uploader.db_utils import database_connection
-from uploader.check_connections import check_db, check_redis
-
-from r_qtl import r_qtl2 as rqtl2
-from r_qtl import r_qtl2_qc as rqc
-from r_qtl import exceptions as rqe
-from r_qtl import fileerrors as rqfe
-
-from scripts.process_rqtl2_bundle import parse_job
-from scripts.redis_logger import setup_redis_logger
-from scripts.cli_parser import init_cli_parser, add_global_data_arguments
-from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
-
-
-def check_for_missing_files(
- rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool:
- """Check that all files listed in the control file do actually exist."""
- logger.info("Checking for missing files.")
- missing = rqc.missing_files(extractpath)
- # add_to_errors(rconn, fqjobid, "errors-generic", tuple(
- # rqfe.MissingFile(
- # mfile[0], mfile[1], (
- # f"File '{mfile[1]}' is listed in the control file under "
- # f"the '{mfile[0]}' key, but it does not actually exist in "
- # "the bundle."))
- # for mfile in missing))
- if len(missing) > 0:
- logger.error(f"Missing files in the bundle!")
- return True
- return False
-
-
-def open_file(file_: Path) -> Iterator:
- """Open file and return one line at a time."""
- with open(file_, "r", encoding="utf8") as infile:
- for line in infile:
- yield line
-
-
-def check_markers(
- filename: str,
- row: tuple[str, ...],
- save_error: lambda val: val
-) -> tuple[rqfe.InvalidValue]:
- """Check that the markers are okay"""
- errors = tuple()
- counts = {}
- for marker in row:
- counts = {**counts, marker: counts.get(marker, 0) + 1}
- if marker is None or marker == "":
- errors = errors + (save_error(rqfe.InvalidValue(
- filename,
- "markers"
- "-",
- marker,
- "A marker MUST be a valid value.")),)
-
- return errors + tuple(
- save_error(rqfe.InvalidValue(
- filename,
- "markers",
- key,
- f"Marker '{key}' was repeated {value} times"))
- for key,value in counts.items() if value > 1)
-
-
-def check_geno_line(
- filename: str,
- headers: tuple[str, ...],
- row: tuple[Union[str, None]],
- cdata: dict,
- save_error: lambda val: val
-) -> tuple[rqfe.InvalidValue]:
- """Check that the geno line is correct."""
- errors = tuple()
- # Verify that line has same number of columns as headers
- if len(headers) != len(row):
- errors = errors + (save_error(rqfe.InvalidValue(
- filename,
- headers[0],
- row[0],
- row[0],
- "Every line MUST have the same number of columns.")),)
-
- # First column is the individuals/cases/samples
- if not bool(row[0]):
- errors = errors + (save_error(rqfe.InvalidValue(
- filename,
- headers[0],
- row[0],
- row[0],
- "The sample/case MUST be a valid value.")),)
-
- def __process_value__(val):
- if val in cdata["na.strings"]:
- return None
- if val in cdata["alleles"]:
- return cdata["genotypes"][val]
-
- genocode = cdata.get("genotypes", {})
- for coltitle, cellvalue in zip(headers[1:],row[1:]):
- if (
- bool(genocode) and
- cellvalue is not None and
- cellvalue not in genocode.keys()
- ):
- errors = errors + (save_error(rqfe.InvalidValue(
- filename, row[0], coltitle, cellvalue,
- f"Value '{cellvalue}' is invalid. Expected one of "
- f"'{', '.join(genocode.keys())}'.")),)
-
- return errors
-
-
-def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue:
- """Push the file error to redis a json string
-
- Parameters
- ----------
- rconn: Connection to redis
- key: The name of the list where we push the errors
- error: The file error to save
-
- Returns
- -------
- Returns the file error it saved
- """
- if bool(error):
- rconn.rpush(key, json.dumps(error._asdict()))
- return error
-
-
-def file_errors_and_details(
- redisargs: dict[str, str],
- file_: Path,
- filetype: str,
- cdata: dict,
- linesplitterfn: Callable,
- linejoinerfn: Callable,
- headercheckers: tuple[Callable, ...],
- bodycheckers: tuple[Callable, ...]
-) -> dict:
- """Compute errors, and other file metadata."""
- errors = tuple()
- if cdata[f"{filetype}_transposed"]:
- rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
-
- with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
- save_error_fn = partial(push_file_error_to_redis,
- rconn,
- error_list_name(filetype, file_.name))
- for lineno, line in enumerate(open_file(file_), start=1):
- row = linesplitterfn(line)
- if lineno == 1:
- headers = tuple(row)
- errors = errors + reduce(
- lambda errs, fnct: errs + fnct(
- file_.name, row[1:], save_error_fn),
- headercheckers,
- tuple())
- continue
-
- errors = errors + reduce(
- lambda errs, fnct: errs + fnct(
- file_.name, headers, row, cdata, save_error_fn),
- bodycheckers,
- tuple())
-
- filedetails = {
- "filename": file_.name,
- "filesize": os.stat(file_).st_size,
- "linecount": lineno
- }
- rconn.hset(redisargs["fqjobid"],
- f"file-details:{filetype}:{file_.name}",
- json.dumps(filedetails))
- return {**filedetails, "errors": errors}
-
-
-def error_list_name(filetype: str, filename: str):
- """Compute the name of the list where the errors will be pushed.
-
- Parameters
- ----------
- filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES`
- filename: The name of the file.
- """
- return f"errors:{filetype}:{filename}"
-
-
-def check_for_geno_errors(
- redisargs: dict[str, str],
- extractdir: Path,
- cdata: dict,
- linesplitterfn: Callable[[str], tuple[Union[str, None]]],
- linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
- logger: Logger
-) -> bool:
- """Check for errors in genotype files."""
- if "geno" in cdata or "founder_geno" in cdata:
- genofiles = tuple(
- extractdir.joinpath(fname) for fname in cdata.get("geno", []))
- fgenofiles = tuple(
- extractdir.joinpath(fname) for fname in cdata.get("founder_geno", []))
- allgenofiles = genofiles + fgenofiles
- with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
- error_list_names = [
- error_list_name("geno", file_.name) for file_ in allgenofiles]
- for list_name in error_list_names:
- rconn.delete(list_name)
- rconn.hset(
- redisargs["fqjobid"],
- "geno-errors-lists",
- json.dumps(error_list_names))
- processes = [
- mproc.Process(target=file_errors_and_details,
- args=(
- redisargs,
- file_,
- ftype,
- cdata,
- linesplitterfn,
- linejoinerfn,
- (check_markers,),
- (check_geno_line,))
- )
- for ftype, file_ in (
- tuple(("geno", file_) for file_ in genofiles) +
- tuple(("founder_geno", file_) for file_ in fgenofiles))
- ]
- for process in processes:
- process.start()
- # Set expiry for any created error lists
- for key in error_list_names:
- rconn.expire(name=key,
- time=timedelta(seconds=redisargs["redisexpiry"]))
-
- # TOD0: Add the errors to redis
- if any(rconn.llen(errlst) > 0 for errlst in error_list_names):
- logger.error("At least one of the 'geno' files has (an) error(s).")
- return True
- logger.info("No error(s) found in any of the 'geno' files.")
-
- else:
- logger.info("No 'geno' files to check.")
-
- return False
-
-
-# def check_for_pheno_errors(...):
-# """Check for errors in phenotype files."""
-# pass
-
-
-# def check_for_phenose_errors(...):
-# """Check for errors in phenotype, standard-error files."""
-# pass
-
-
-# def check_for_phenocovar_errors(...):
-# """Check for errors in phenotype-covariates files."""
-# pass
-
-
-def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int:
- """Run quality control checks on R/qtl2 bundles."""
- thejob = parse_job(rconn, args.redisprefix, args.jobid)
- print(f"THE JOB =================> {thejob}")
- jobmeta = thejob["job-metadata"]
- inpath = Path(jobmeta["rqtl2-bundle-file"])
- extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir")
- with ZipFile(inpath, "r") as zfile:
- rqtl2.extract(zfile, extractdir)
-
- ### BEGIN: The quality control checks ###
- cdata = rqtl2.control_data(extractdir)
- splitter = build_line_splitter(cdata)
- joiner = build_line_joiner(cdata)
-
- redisargs = {
- "fqjobid": fqjobid,
- "redisuri": args.redisuri,
- "redisexpiry": args.redisexpiry
- }
- check_for_missing_files(rconn, fqjobid, extractdir, logger)
- # check_for_pheno_errors(...)
- check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger)
- # check_for_phenose_errors(...)
- # check_for_phenocovar_errors(...)
- ### END: The quality control checks ###
-
- def __fetch_errors__(rkey: str) -> tuple:
- return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
-
- return (1 if any((
- bool(__fetch_errors__(key))
- for key in
- ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
- else 0)
-
-
-if __name__ == "__main__":
- def main():
- """Enter R/qtl2 bundle QC runner."""
- args = add_global_data_arguments(init_cli_parser(
- "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
- check_redis(args.redisuri)
- check_db(args.databaseuri)
-
- logger = getLogger("qc-on-rqtl2-bundle")
- logger.addHandler(StreamHandler(stream=sys.stderr))
- logger.setLevel("DEBUG")
-
- fqjobid = jobs.job_key(args.redisprefix, args.jobid)
- with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
- logger.addHandler(setup_redis_logger(
- rconn, fqjobid, f"{fqjobid}:log-messages",
- args.redisexpiry))
-
- exitcode = run_qc(rconn, args, fqjobid, logger)
- rconn.hset(
- jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
- return exitcode
-
- sys.exit(main())