about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py346
1 files changed, 0 insertions, 346 deletions
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
deleted file mode 100644
index 7e5d253..0000000
--- a/scripts/qc_on_rqtl2_bundle2.py
+++ /dev/null
@@ -1,346 +0,0 @@
-"""Run Quality Control checks on R/qtl2 bundle."""
-import os
-import sys
-import json
-from time import sleep
-from pathlib import Path
-from zipfile import ZipFile
-from argparse import Namespace
-from datetime import timedelta
-import multiprocessing as mproc
-from functools import reduce, partial
-from logging import Logger, getLogger, StreamHandler
-from typing import Union, Sequence, Callable, Iterator
-
-import MySQLdb as mdb
-from redis import Redis
-
-from quality_control.errors import InvalidValue
-from quality_control.checks import decimal_points_error
-
-from uploader import jobs
-from uploader.db_utils import database_connection
-from uploader.check_connections import check_db, check_redis
-
-from r_qtl import r_qtl2 as rqtl2
-from r_qtl import r_qtl2_qc as rqc
-from r_qtl import exceptions as rqe
-from r_qtl import fileerrors as rqfe
-
-from scripts.process_rqtl2_bundle import parse_job
-from scripts.redis_logger import setup_redis_logger
-from scripts.cli_parser import init_cli_parser, add_global_data_arguments
-from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
-
-
-def check_for_missing_files(
-        rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool:
-    """Check that all files listed in the control file do actually exist."""
-    logger.info("Checking for missing files.")
-    missing = rqc.missing_files(extractpath)
-    # add_to_errors(rconn, fqjobid, "errors-generic", tuple(
-    #     rqfe.MissingFile(
-    #         mfile[0], mfile[1], (
-    #             f"File '{mfile[1]}' is listed in the control file under "
-    #             f"the '{mfile[0]}' key, but it does not actually exist in "
-    #             "the bundle."))
-    #     for mfile in missing))
-    if len(missing) > 0:
-        logger.error(f"Missing files in the bundle!")
-        return True
-    return False
-
-
-def open_file(file_: Path) -> Iterator:
-    """Open file and return one line at a time."""
-    with open(file_, "r", encoding="utf8") as infile:
-        for line in infile:
-            yield line
-
-
-def check_markers(
-        filename: str,
-        row: tuple[str, ...],
-        save_error: lambda val: val
-) -> tuple[rqfe.InvalidValue]:
-    """Check that the markers are okay"""
-    errors = tuple()
-    counts = {}
-    for marker in row:
-        counts = {**counts, marker: counts.get(marker, 0) + 1}
-        if marker is None or marker == "":
-            errors = errors + (save_error(rqfe.InvalidValue(
-                filename,
-                "markers"
-                "-",
-                marker,
-                "A marker MUST be a valid value.")),)
-
-    return errors + tuple(
-        save_error(rqfe.InvalidValue(
-            filename,
-            "markers",
-            key,
-            f"Marker '{key}' was repeated {value} times"))
-        for key,value in counts.items() if value > 1)
-
-
-def check_geno_line(
-        filename: str,
-        headers: tuple[str, ...],
-        row: tuple[Union[str, None]],
-        cdata: dict,
-        save_error: lambda val: val
-) -> tuple[rqfe.InvalidValue]:
-    """Check that the geno line is correct."""
-    errors = tuple()
-    # Verify that line has same number of columns as headers
-    if len(headers) != len(row):
-        errors = errors + (save_error(rqfe.InvalidValue(
-            filename,
-            headers[0],
-            row[0],
-            row[0],
-            "Every line MUST have the same number of columns.")),)
-
-    # First column is the individuals/cases/samples
-    if not bool(row[0]):
-        errors = errors + (save_error(rqfe.InvalidValue(
-            filename,
-            headers[0],
-            row[0],
-            row[0],
-            "The sample/case MUST be a valid value.")),)
-
-    def __process_value__(val):
-        if val in cdata["na.strings"]:
-            return None
-        if val in cdata["alleles"]:
-            return cdata["genotypes"][val]
-
-    genocode = cdata.get("genotypes", {})
-    for coltitle, cellvalue in zip(headers[1:],row[1:]):
-        if (
-                bool(genocode) and
-                cellvalue is not None and
-                cellvalue not in genocode.keys()
-        ):
-            errors = errors + (save_error(rqfe.InvalidValue(
-                filename, row[0], coltitle, cellvalue,
-                f"Value '{cellvalue}' is invalid. Expected one of "
-                f"'{', '.join(genocode.keys())}'.")),)
-
-    return errors
-
-
-def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue:
-    """Push the file error to redis a json string
-
-    Parameters
-    ----------
-    rconn: Connection to redis
-    key: The name of the list where we push the errors
-    error: The file error to save
-
-    Returns
-    -------
-    Returns the file error it saved
-    """
-    if bool(error):
-        rconn.rpush(key, json.dumps(error._asdict()))
-    return error
-
-
-def file_errors_and_details(
-        redisargs: dict[str, str],
-        file_: Path,
-        filetype: str,
-        cdata: dict,
-        linesplitterfn: Callable,
-        linejoinerfn: Callable,
-        headercheckers: tuple[Callable, ...],
-        bodycheckers: tuple[Callable, ...]
-) -> dict:
-    """Compute errors, and other file metadata."""
-    errors = tuple()
-    if cdata[f"{filetype}_transposed"]:
-        rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
-
-    with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
-        save_error_fn = partial(push_file_error_to_redis,
-                                rconn,
-                                error_list_name(filetype, file_.name))
-        for lineno, line in enumerate(open_file(file_), start=1):
-            row = linesplitterfn(line)
-            if lineno == 1:
-                headers = tuple(row)
-                errors = errors + reduce(
-                    lambda errs, fnct: errs + fnct(
-                        file_.name, row[1:], save_error_fn),
-                    headercheckers,
-                    tuple())
-                continue
-
-            errors = errors + reduce(
-                lambda errs, fnct: errs + fnct(
-                    file_.name, headers, row, cdata, save_error_fn),
-                bodycheckers,
-                tuple())
-
-        filedetails = {
-            "filename": file_.name,
-            "filesize": os.stat(file_).st_size,
-            "linecount": lineno
-        }
-        rconn.hset(redisargs["fqjobid"],
-                   f"file-details:{filetype}:{file_.name}",
-                   json.dumps(filedetails))
-        return {**filedetails, "errors": errors}
-
-
-def error_list_name(filetype: str, filename: str):
-    """Compute the name of the list where the errors will be pushed.
-
-    Parameters
-    ----------
-    filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES`
-    filename: The name of the file.
-    """
-    return f"errors:{filetype}:{filename}"
-
-
-def check_for_geno_errors(
-        redisargs: dict[str, str],
-        extractdir: Path,
-        cdata: dict,
-        linesplitterfn: Callable[[str], tuple[Union[str, None]]],
-        linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
-        logger: Logger
-) -> bool:
-    """Check for errors in genotype files."""
-    if "geno" in cdata or "founder_geno" in cdata:
-        genofiles = tuple(
-            extractdir.joinpath(fname) for fname in cdata.get("geno", []))
-        fgenofiles = tuple(
-            extractdir.joinpath(fname) for fname in cdata.get("founder_geno", []))
-        allgenofiles = genofiles + fgenofiles
-        with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
-            error_list_names = [
-                error_list_name("geno", file_.name) for file_ in allgenofiles]
-            for list_name in error_list_names:
-                rconn.delete(list_name)
-            rconn.hset(
-                redisargs["fqjobid"],
-                "geno-errors-lists",
-                json.dumps(error_list_names))
-            processes = [
-                mproc.Process(target=file_errors_and_details,
-                              args=(
-                                  redisargs,
-                                  file_,
-                                  ftype,
-                                  cdata,
-                                  linesplitterfn,
-                                  linejoinerfn,
-                                  (check_markers,),
-                                  (check_geno_line,))
-                              )
-                for ftype, file_ in (
-                        tuple(("geno", file_) for file_ in genofiles) +
-                        tuple(("founder_geno", file_) for file_ in fgenofiles))
-            ]
-            for process in processes:
-                process.start()
-            # Set expiry for any created error lists
-            for key in error_list_names:
-                rconn.expire(name=key,
-                             time=timedelta(seconds=redisargs["redisexpiry"]))
-
-            # TOD0: Add the errors to redis
-            if any(rconn.llen(errlst) > 0 for errlst in error_list_names):
-                logger.error("At least one of the 'geno' files has (an) error(s).")
-                return True
-            logger.info("No error(s) found in any of the 'geno' files.")
-
-    else:
-        logger.info("No 'geno' files to check.")
-
-    return False
-
-
-# def check_for_pheno_errors(...):
-#     """Check for errors in phenotype files."""
-#     pass
-
-
-# def check_for_phenose_errors(...):
-#     """Check for errors in phenotype, standard-error files."""
-#     pass
-
-
-# def check_for_phenocovar_errors(...):
-#     """Check for errors in phenotype-covariates files."""
-#     pass
-
-
-def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int:
-    """Run quality control checks on R/qtl2 bundles."""
-    thejob = parse_job(rconn, args.redisprefix, args.jobid)
-    print(f"THE JOB =================> {thejob}")
-    jobmeta = thejob["job-metadata"]
-    inpath = Path(jobmeta["rqtl2-bundle-file"])
-    extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir")
-    with ZipFile(inpath, "r") as zfile:
-        rqtl2.extract(zfile, extractdir)
-
-    ### BEGIN: The quality control checks ###
-    cdata = rqtl2.control_data(extractdir)
-    splitter = build_line_splitter(cdata)
-    joiner = build_line_joiner(cdata)
-
-    redisargs = {
-        "fqjobid": fqjobid,
-        "redisuri": args.redisuri,
-        "redisexpiry": args.redisexpiry
-    }
-    check_for_missing_files(rconn, fqjobid, extractdir, logger)
-    # check_for_pheno_errors(...)
-    check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger)
-    # check_for_phenose_errors(...)
-    # check_for_phenocovar_errors(...)
-    ### END: The quality control checks ###
-
-    def __fetch_errors__(rkey: str) -> tuple:
-        return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
-
-    return (1 if any((
-        bool(__fetch_errors__(key))
-        for key in
-        ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
-            else 0)
-
-
-if __name__ == "__main__":
-    def main():
-        """Enter R/qtl2 bundle QC runner."""
-        args = add_global_data_arguments(init_cli_parser(
-            "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
-        check_redis(args.redisuri)
-        check_db(args.databaseuri)
-
-        logger = getLogger("qc-on-rqtl2-bundle")
-        logger.addHandler(StreamHandler(stream=sys.stderr))
-        logger.setLevel("DEBUG")
-
-        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
-        with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
-            logger.addHandler(setup_redis_logger(
-                rconn, fqjobid, f"{fqjobid}:log-messages",
-                args.redisexpiry))
-
-            exitcode = run_qc(rconn, args, fqjobid, logger)
-            rconn.hset(
-                jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
-            return exitcode
-
-    sys.exit(main())