diff options
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/cli_parser.py | 6 | ||||
-rw-r--r-- | scripts/process_rqtl2_bundle.py | 19 | ||||
-rw-r--r-- | scripts/qc_on_rqtl2_bundle2.py | 346 | ||||
-rw-r--r-- | scripts/redis_logger.py | 21 | ||||
-rw-r--r-- | scripts/rqtl2/bundleutils.py | 44 | ||||
-rw-r--r-- | scripts/rqtl2/cli_parser.py | 20 | ||||
-rw-r--r-- | scripts/rqtl2/entry.py | 22 | ||||
-rw-r--r-- | scripts/rqtl2/install_genotypes.py | 25 | ||||
-rw-r--r-- | scripts/rqtl2/install_phenos.py | 26 | ||||
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 468 |
10 files changed, 941 insertions, 56 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index 308ee4b..d42ae66 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -19,6 +19,12 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument type=int, default=86400, help="How long to keep any redis keys around.") + parser.add_argument( + "--loglevel", + type=str, + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="The severity of events to track with the logger.") return parser def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser: diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 20cfd3b..ade9862 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -2,6 +2,7 @@ import sys import uuid import json +import argparse import traceback from typing import Any from pathlib import Path @@ -94,10 +95,11 @@ def process_bundle(dbconn: mdb.Connection, logger.info("Processing geno files.") genoexit = install_genotypes( dbconn, - meta["speciesid"], - meta["populationid"], - meta["geno-dataset-id"], - Path(meta["rqtl2-bundle-file"]), + argparse.Namespace( + speciesid=meta["speciesid"], + populationid=meta["populationid"], + datasetid=meta["geno-dataset-id"], + rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if genoexit != 0: raise Exception("Processing 'geno' file failed.") @@ -109,10 +111,11 @@ def process_bundle(dbconn: mdb.Connection, if has_pheno_file(thejob): phenoexit = install_pheno_files( dbconn, - meta["speciesid"], - meta["platformid"], - meta["probe-dataset-id"], - Path(meta["rqtl2-bundle-file"]), + argparse.Namespace( + speciesid=meta["speciesid"], + platformid=meta["platformid"], + dataset_id=meta["probe-dataset-id"], + rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if phenoexit != 0: raise Exception("Processing 'pheno' file failed.") diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py new file mode 100644 index 0000000..7e5d253 --- /dev/null +++ b/scripts/qc_on_rqtl2_bundle2.py @@ -0,0 +1,346 @@ +"""Run Quality Control checks on R/qtl2 bundle.""" +import os +import sys +import json +from time import sleep +from pathlib import Path +from zipfile import ZipFile +from argparse import Namespace +from datetime import timedelta +import multiprocessing as mproc +from functools import reduce, partial +from logging import Logger, getLogger, StreamHandler +from typing import Union, Sequence, Callable, Iterator + +import MySQLdb as mdb +from redis import Redis + +from quality_control.errors import InvalidValue +from quality_control.checks import decimal_points_error + +from uploader import jobs +from uploader.db_utils import database_connection +from uploader.check_connections import check_db, check_redis + +from r_qtl import r_qtl2 as rqtl2 +from r_qtl import r_qtl2_qc as rqc +from r_qtl import exceptions as rqe +from r_qtl import fileerrors as rqfe + +from scripts.process_rqtl2_bundle import parse_job +from scripts.redis_logger import setup_redis_logger +from scripts.cli_parser import init_cli_parser, add_global_data_arguments +from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter + + +def check_for_missing_files( + rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool: + """Check that all files listed in the control file do actually exist.""" + logger.info("Checking for missing files.") + missing = rqc.missing_files(extractpath) + # add_to_errors(rconn, fqjobid, "errors-generic", tuple( + # rqfe.MissingFile( + # mfile[0], mfile[1], ( + # f"File '{mfile[1]}' is listed in the control file under " + # f"the '{mfile[0]}' key, but it does not actually exist in " + # "the bundle.")) + # for mfile in missing)) + if len(missing) > 0: + logger.error(f"Missing files in the bundle!") + return True + return False + + +def open_file(file_: Path) -> Iterator: + """Open file and return one line at a time.""" + with open(file_, "r", encoding="utf8") as infile: + for line in infile: + yield line + + +def check_markers( + filename: str, + row: tuple[str, ...], + save_error: lambda val: val +) -> tuple[rqfe.InvalidValue]: + """Check that the markers are okay""" + errors = tuple() + counts = {} + for marker in row: + counts = {**counts, marker: counts.get(marker, 0) + 1} + if marker is None or marker == "": + errors = errors + (save_error(rqfe.InvalidValue( + filename, + "markers" + "-", + marker, + "A marker MUST be a valid value.")),) + + return errors + tuple( + save_error(rqfe.InvalidValue( + filename, + "markers", + key, + f"Marker '{key}' was repeated {value} times")) + for key,value in counts.items() if value > 1) + + +def check_geno_line( + filename: str, + headers: tuple[str, ...], + row: tuple[Union[str, None]], + cdata: dict, + save_error: lambda val: val +) -> tuple[rqfe.InvalidValue]: + """Check that the geno line is correct.""" + errors = tuple() + # Verify that line has same number of columns as headers + if len(headers) != len(row): + errors = errors + (save_error(rqfe.InvalidValue( + filename, + headers[0], + row[0], + row[0], + "Every line MUST have the same number of columns.")),) + + # First column is the individuals/cases/samples + if not bool(row[0]): + errors = errors + (save_error(rqfe.InvalidValue( + filename, + headers[0], + row[0], + row[0], + "The sample/case MUST be a valid value.")),) + + def __process_value__(val): + if val in cdata["na.strings"]: + return None + if val in cdata["alleles"]: + return cdata["genotypes"][val] + + genocode = cdata.get("genotypes", {}) + for coltitle, cellvalue in zip(headers[1:],row[1:]): + if ( + bool(genocode) and + cellvalue is not None and + cellvalue not in genocode.keys() + ): + errors = errors + (save_error(rqfe.InvalidValue( + filename, row[0], coltitle, cellvalue, + f"Value '{cellvalue}' is invalid. Expected one of " + f"'{', '.join(genocode.keys())}'.")),) + + return errors + + +def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue: + """Push the file error to redis a json string + + Parameters + ---------- + rconn: Connection to redis + key: The name of the list where we push the errors + error: The file error to save + + Returns + ------- + Returns the file error it saved + """ + if bool(error): + rconn.rpush(key, json.dumps(error._asdict())) + return error + + +def file_errors_and_details( + redisargs: dict[str, str], + file_: Path, + filetype: str, + cdata: dict, + linesplitterfn: Callable, + linejoinerfn: Callable, + headercheckers: tuple[Callable, ...], + bodycheckers: tuple[Callable, ...] +) -> dict: + """Compute errors, and other file metadata.""" + errors = tuple() + if cdata[f"{filetype}_transposed"]: + rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) + + with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: + save_error_fn = partial(push_file_error_to_redis, + rconn, + error_list_name(filetype, file_.name)) + for lineno, line in enumerate(open_file(file_), start=1): + row = linesplitterfn(line) + if lineno == 1: + headers = tuple(row) + errors = errors + reduce( + lambda errs, fnct: errs + fnct( + file_.name, row[1:], save_error_fn), + headercheckers, + tuple()) + continue + + errors = errors + reduce( + lambda errs, fnct: errs + fnct( + file_.name, headers, row, cdata, save_error_fn), + bodycheckers, + tuple()) + + filedetails = { + "filename": file_.name, + "filesize": os.stat(file_).st_size, + "linecount": lineno + } + rconn.hset(redisargs["fqjobid"], + f"file-details:{filetype}:{file_.name}", + json.dumps(filedetails)) + return {**filedetails, "errors": errors} + + +def error_list_name(filetype: str, filename: str): + """Compute the name of the list where the errors will be pushed. + + Parameters + ---------- + filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES` + filename: The name of the file. + """ + return f"errors:{filetype}:{filename}" + + +def check_for_geno_errors( + redisargs: dict[str, str], + extractdir: Path, + cdata: dict, + linesplitterfn: Callable[[str], tuple[Union[str, None]]], + linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], + logger: Logger +) -> bool: + """Check for errors in genotype files.""" + if "geno" in cdata or "founder_geno" in cdata: + genofiles = tuple( + extractdir.joinpath(fname) for fname in cdata.get("geno", [])) + fgenofiles = tuple( + extractdir.joinpath(fname) for fname in cdata.get("founder_geno", [])) + allgenofiles = genofiles + fgenofiles + with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: + error_list_names = [ + error_list_name("geno", file_.name) for file_ in allgenofiles] + for list_name in error_list_names: + rconn.delete(list_name) + rconn.hset( + redisargs["fqjobid"], + "geno-errors-lists", + json.dumps(error_list_names)) + processes = [ + mproc.Process(target=file_errors_and_details, + args=( + redisargs, + file_, + ftype, + cdata, + linesplitterfn, + linejoinerfn, + (check_markers,), + (check_geno_line,)) + ) + for ftype, file_ in ( + tuple(("geno", file_) for file_ in genofiles) + + tuple(("founder_geno", file_) for file_ in fgenofiles)) + ] + for process in processes: + process.start() + # Set expiry for any created error lists + for key in error_list_names: + rconn.expire(name=key, + time=timedelta(seconds=redisargs["redisexpiry"])) + + # TOD0: Add the errors to redis + if any(rconn.llen(errlst) > 0 for errlst in error_list_names): + logger.error("At least one of the 'geno' files has (an) error(s).") + return True + logger.info("No error(s) found in any of the 'geno' files.") + + else: + logger.info("No 'geno' files to check.") + + return False + + +# def check_for_pheno_errors(...): +# """Check for errors in phenotype files.""" +# pass + + +# def check_for_phenose_errors(...): +# """Check for errors in phenotype, standard-error files.""" +# pass + + +# def check_for_phenocovar_errors(...): +# """Check for errors in phenotype-covariates files.""" +# pass + + +def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int: + """Run quality control checks on R/qtl2 bundles.""" + thejob = parse_job(rconn, args.redisprefix, args.jobid) + print(f"THE JOB =================> {thejob}") + jobmeta = thejob["job-metadata"] + inpath = Path(jobmeta["rqtl2-bundle-file"]) + extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir") + with ZipFile(inpath, "r") as zfile: + rqtl2.extract(zfile, extractdir) + + ### BEGIN: The quality control checks ### + cdata = rqtl2.control_data(extractdir) + splitter = build_line_splitter(cdata) + joiner = build_line_joiner(cdata) + + redisargs = { + "fqjobid": fqjobid, + "redisuri": args.redisuri, + "redisexpiry": args.redisexpiry + } + check_for_missing_files(rconn, fqjobid, extractdir, logger) + # check_for_pheno_errors(...) + check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger) + # check_for_phenose_errors(...) + # check_for_phenocovar_errors(...) + ### END: The quality control checks ### + + def __fetch_errors__(rkey: str) -> tuple: + return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) + + return (1 if any(( + bool(__fetch_errors__(key)) + for key in + ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) + else 0) + + +if __name__ == "__main__": + def main(): + """Enter R/qtl2 bundle QC runner.""" + args = add_global_data_arguments(init_cli_parser( + "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() + check_redis(args.redisuri) + check_db(args.databaseuri) + + logger = getLogger("qc-on-rqtl2-bundle") + logger.addHandler(StreamHandler(stream=sys.stderr)) + logger.setLevel("DEBUG") + + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + with Redis.from_url(args.redisuri, decode_responses=True) as rconn: + logger.addHandler(setup_redis_logger( + rconn, fqjobid, f"{fqjobid}:log-messages", + args.redisexpiry)) + + exitcode = run_qc(rconn, args, fqjobid, logger) + rconn.hset( + jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) + return exitcode + + sys.exit(main()) diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py index 2ae682b..d3fde5f 100644 --- a/scripts/redis_logger.py +++ b/scripts/redis_logger.py @@ -1,5 +1,6 @@ """Utilities to log to redis for our worker scripts.""" import logging +from typing import Optional from redis import Redis @@ -26,6 +27,26 @@ class RedisLogger(logging.Handler): self.redisconnection.rpush(self.messageslistname, self.format(record)) self.redisconnection.expire(self.messageslistname, self.expiry) +class RedisMessageListHandler(logging.Handler): + """Send messages to specified redis list.""" + def __init__(self, + rconn: Redis, + fullyqualifiedkey: str, + loglevel: int = logging.NOTSET, + expiry: Optional[int] = 86400): + super().__init__(loglevel) + self.redisconnection = rconn + self.fullyqualifiedkey = fullyqualifiedkey + self.expiry = expiry + + def emit(self, record): + """Log out to specified `fullyqualifiedkey`.""" + self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record)) + if bool(self.expiry): + self.redisconnection.expire(self.fullyqualifiedkey, self.expiry) + else: + self.redisconnection.persist(self.fullyqualifiedkey) + def setup_redis_logger(rconn: Redis, fullyqualifiedjobid: str, job_messagelist: str, diff --git a/scripts/rqtl2/bundleutils.py b/scripts/rqtl2/bundleutils.py new file mode 100644 index 0000000..17faa7c --- /dev/null +++ b/scripts/rqtl2/bundleutils.py @@ -0,0 +1,44 @@ +"""Common utilities to operate in R/qtl2 bundles.""" +from typing import Union, Callable + +def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]: + """Build and return a function to use to split data in the files. + + Parameters + ---------- + cdata: A dict holding the control information included with the R/qtl2 + bundle. + + Returns + ------- + A function that takes a string and return a tuple of strings. + """ + separator = cdata["sep"] + na_strings = cdata["na.strings"] + def __splitter__(line: str) -> tuple[Union[str, None], ...]: + return tuple( + item if item not in na_strings else None + for item in + (field.strip() for field in line.strip().split(separator))) + return __splitter__ + + +def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]: + """Build and return a function to use to split data in the files. + + Parameters + ---------- + cdata: A dict holding the control information included with the R/qtl2 + bundle. + + Returns + ------- + A function that takes a string and return a tuple of strings. + """ + separator = cdata["sep"] + na_strings = cdata["na.strings"] + def __joiner__(row: tuple[Union[str, None], ...]) -> str: + return separator.join( + (na_strings[0] if item is None else item) + for item in row) + return __joiner__ diff --git a/scripts/rqtl2/cli_parser.py b/scripts/rqtl2/cli_parser.py index bcc7a4f..9bb60a3 100644 --- a/scripts/rqtl2/cli_parser.py +++ b/scripts/rqtl2/cli_parser.py @@ -2,12 +2,22 @@ from pathlib import Path from argparse import ArgumentParser -def add_common_arguments(parser: ArgumentParser) -> ArgumentParser: - """Add common arguments to the CLI parser.""" - parser.add_argument("datasetid", - type=int, - help="The dataset to which the data belongs.") +def add_bundle_argument(parser: ArgumentParser) -> ArgumentParser: + """Add the `rqtl2bundle` argument.""" parser.add_argument("rqtl2bundle", type=Path, help="Path to R/qtl2 bundle zip file.") return parser + + +def add_datasetid_argument(parser: ArgumentParser) -> ArgumentParser: + """Add the `datasetid` argument.""" + parser.add_argument("datasetid", + type=int, + help="The dataset to which the data belongs.") + return parser + + +def add_common_arguments(parser: ArgumentParser) -> ArgumentParser: + """Add common arguments to the CLI parser.""" + return add_bundle_argument(add_datasetid_argument(parser)) diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index b7fb68e..bc4cd9f 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -1,5 +1,5 @@ """Build common script-entry structure.""" -from logging import Logger +import logging from typing import Callable from argparse import Namespace @@ -12,12 +12,19 @@ from uploader.check_connections import check_db, check_redis from scripts.redis_logger import setup_redis_logger -def build_main(args: Namespace, - run_fn: Callable[[Connection, Namespace], int], - logger: Logger, - loglevel: str = "INFO") -> Callable[[],int]: +def build_main( + args: Namespace, + run_fn: Callable[[Connection, Namespace, logging.Logger], int], + loggername: str +) -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): + logging.basicConfig( + format=( + "%(asctime)s - %(levelname)s %(name)s: " + "(%(pathname)s: %(lineno)d) %(message)s"), + level=args.loglevel) + logger = logging.getLogger(loggername) check_db(args.databaseuri) check_redis(args.redisuri) if not args.rqtl2bundle.exists(): @@ -26,13 +33,12 @@ def build_main(args: Namespace, with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): - fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid) + fqjobid = jobs.job_key(args.redisprefix, args.jobid) logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) - logger.setLevel(loglevel) - return run_fn(dbconn, args) + return run_fn(dbconn, args, logger) return main diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py index 6b89142..20a19da 100644 --- a/scripts/rqtl2/install_genotypes.py +++ b/scripts/rqtl2/install_genotypes.py @@ -1,11 +1,11 @@ """Load genotypes from R/qtl2 bundle into the database.""" import sys +import argparse import traceback -from pathlib import Path from zipfile import ZipFile from functools import reduce from typing import Iterator, Optional -from logging import Logger, getLogger, StreamHandler +from logging import Logger, getLogger import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -19,6 +19,8 @@ from scripts.rqtl2.entry import build_main from scripts.rqtl2.cli_parser import add_common_arguments from scripts.cli_parser import init_cli_parser, add_global_data_arguments +__MODULE__ = "scripts.rqtl2.install_genotypes" + def insert_markers( dbconn: mdb.Connection, speciesid: int, @@ -185,13 +187,12 @@ def cross_reference_genotypes( def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals] dbconn: mdb.Connection, - speciesid: int, - populationid: int, - datasetid: int, - rqtl2bundle: Path, + args: argparse.Namespace, logger: Logger = getLogger(__name__) ) -> int: """Load any existing genotypes into the database.""" + (speciesid, populationid, datasetid, rqtl2bundle) = ( + args.speciesid, args.populationid, args.datasetid, args.rqtl2bundle) count = 0 with ZipFile(str(rqtl2bundle.absolute()), "r") as zfile: try: @@ -253,15 +254,5 @@ if __name__ == "__main__": return parser.parse_args() - thelogger = getLogger("install_genotypes") - thelogger.addHandler(StreamHandler(stream=sys.stderr)) - main = build_main( - cli_args(), - lambda dbconn, args: install_genotypes(dbconn, - args.speciesid, - args.populationid, - args.datasetid, - args.rqtl2bundle), - thelogger, - "INFO") + main = build_main(cli_args(), install_genotypes, __MODULE__) sys.exit(main()) diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index b5cab8e..a6e9fb2 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -1,10 +1,10 @@ """Load pheno from R/qtl2 bundle into the database.""" import sys +import argparse import traceback -from pathlib import Path from zipfile import ZipFile from functools import reduce -from logging import Logger, getLogger, StreamHandler +from logging import Logger, getLogger import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -18,6 +18,8 @@ from r_qtl import r_qtl2_qc as rqc from functional_tools import take +__MODULE__ = "scripts.rqtl2.install_phenos" + def insert_probesets(dbconn: mdb.Connection, platformid: int, phenos: tuple[str, ...]) -> int: @@ -95,12 +97,11 @@ def cross_reference_probeset_data(dbconn: mdb.Connection, def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals] dbconn: mdb.Connection, - speciesid: int, - platformid: int, - datasetid: int, - rqtl2bundle: Path, + args: argparse.Namespace, logger: Logger = getLogger()) -> int: """Load data in `pheno` files and other related files into the database.""" + (speciesid, platformid, datasetid, rqtl2bundle) = ( + args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle) with ZipFile(str(rqtl2bundle), "r") as zfile: try: rqc.validate_bundle(zfile) @@ -155,16 +156,5 @@ if __name__ == "__main__": return parser.parse_args() - thelogger = getLogger("install_phenos") - thelogger.addHandler(StreamHandler(stream=sys.stderr)) - main = build_main( - cli_args(), - lambda dbconn, args: install_pheno_files(dbconn, - args.speciesid, - args.platformid, - args.datasetid, - args.rqtl2bundle, - thelogger), - thelogger, - "DEBUG") + main = build_main(cli_args(), install_pheno_files, __MODULE__) sys.exit(main()) diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py new file mode 100644 index 0000000..83828e4 --- /dev/null +++ b/scripts/rqtl2/phenotypes_qc.py @@ -0,0 +1,468 @@ +"""Run quality control on phenotypes-specific files in the bundle.""" +import sys +import uuid +import shutil +import logging +import tempfile +import contextlib +from pathlib import Path +from logging import Logger +from zipfile import ZipFile +from argparse import Namespace +import multiprocessing as mproc +from functools import reduce, partial +from typing import Union, Iterator, Callable, Optional, Sequence + +import MySQLdb as mdb +from redis import Redis + +from r_qtl import r_qtl2 as rqtl2 +from r_qtl import r_qtl2_qc as rqc +from r_qtl import exceptions as rqe +from r_qtl.fileerrors import InvalidValue + +from functional_tools import chain + +from quality_control.checks import decimal_places_pattern + +from uploader.files import sha256_digest_over_file +from uploader.samples.models import samples_by_species_and_population + +from scripts.rqtl2.entry import build_main +from scripts.redis_logger import RedisMessageListHandler +from scripts.rqtl2.cli_parser import add_bundle_argument +from scripts.cli_parser import init_cli_parser, add_global_data_arguments +from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter + +__MODULE__ = "scripts.rqtl2.phenotypes_qc" + +def validate(phenobundle: Path, logger: Logger) -> dict: + """Check that the bundle is generally valid""" + try: + rqc.validate_bundle(phenobundle) + except rqe.RQTLError as rqtlerr: + # logger.error("Bundle file validation failed!", exc_info=True) + return { + "skip": True, + "logger": logger, + "phenobundle": phenobundle, + "errors": (" ".join(rqtlerr.args),) + } + return { + "errors": tuple(), + "skip": False, + "phenobundle": phenobundle, + "logger": logger + } + + +def check_for_mandatory_pheno_keys( + phenobundle: Path, + logger: Logger, + **kwargs +) -> dict: + """Check that the mandatory keys exist for phenotypes.""" + if kwargs.get("skip", False): + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle + } + + _mandatory_keys = ("pheno", "phenocovar") + _cdata = rqtl2.read_control_file(phenobundle) + _errors = kwargs.get("errors", tuple()) + tuple( + f"Expected '{key}' file(s) are not declared in the bundle." + for key in _mandatory_keys if key not in _cdata.keys()) + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle, + "errors": _errors, + "skip": len(_errors) > 0 + } + + +def check_for_averages_files( + phenobundle: Path, + logger: Logger, + **kwargs +) -> dict: + """Check that averages files appear together""" + if kwargs.get("skip", False): + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle + } + + _together = (("phenose", "phenonum"), ("phenonum", "phenose")) + _cdata = rqtl2.read_control_file(phenobundle) + _errors = kwargs.get("errors", tuple()) + tuple( + f"'{first}' is defined in the control file but there is no " + f"corresponding '{second}'" + for first, second in _together + if ((first in _cdata.keys()) and (second not in _cdata.keys()))) + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle, + "errors": _errors, + "skip": len(_errors) > 0 + } + + +def extract_bundle( + bundle: Path, workdir: Path, jobid: uuid.UUID +) -> tuple[Path, tuple[Path, ...]]: + """Extract the bundle.""" + with ZipFile(bundle) as zfile: + extractiondir = workdir.joinpath( + f"{str(jobid)}-{sha256_digest_over_file(bundle)}-{bundle.name}") + return extractiondir, rqtl2.extract(zfile, extractiondir) + + +def undo_transpose(filetype: str, cdata: dict, extractiondir): + """Undo transposition of all files of type `filetype` in thebundle.""" + if len(cdata.get(filetype, [])) > 0 and cdata.get(f"{filetype}_transposed", False): + files = (extractiondir.joinpath(_file) for _file in cdata[filetype]) + for _file in files: + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(cdata), + build_line_joiner(cdata)) + + +@contextlib.contextmanager +def redis_logger( + redisuri: str, loggername: str, filename: str, fqkey: str +) -> Iterator[logging.Logger]: + """Build a Redis message-list logger.""" + rconn = Redis.from_url(redisuri, decode_responses=True) + logger = logging.getLogger(loggername) + logger.propagate = False + handler = RedisMessageListHandler( + rconn, + fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type] + handler.setFormatter(logging.getLogger().handlers[0].formatter) + logger.addHandler(handler) + try: + yield logger + finally: + rconn.close() + + +def qc_phenocovar_file( + filepath: Path, + redisuri, + fqkey: str, + separator: str, + comment_char: str): + """Check that `phenocovar` files are structured correctly.""" + with redis_logger( + redisuri, + f"{__MODULE__}.qc_phenocovar_file", + filepath.name, + fqkey) as logger: + logger.info("Running QC on file: %s", filepath.name) + _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) + _headings = tuple(heading.lower() for heading in next(_csvfile)) + _errors: tuple[InvalidValue, ...] = tuple() + for heading in ("description", "units"): + if heading not in _headings: + _errors = (InvalidValue( + filepath.name, + "header row", + "-", + "-", + (f"File {filepath.name} is missing the {heading} heading " + "in the header line.")),) + + def collect_errors(errors_and_linecount, line): + _errs, _lc = errors_and_linecount + logger.info("Testing record '%s'", line[0]) + if len(line) != len(_headings): + _errs = _errs + (InvalidValue( + filepath.name, + line[0], + "-", + "-", + (f"Record {_lc} in file {filepath.name} has a different " + "number of columns than the number of headings")),) + _line = dict(zip(_headings, line)) + if not bool(_line["description"]): + _errs = _errs + ( + InvalidValue(filepath.name, + _line[_headings[0]], + "description", + _line["description"], + "The description is not provided!"),) + + return _errs, _lc+1 + + return { + filepath.name: dict(zip( + ("errors", "linecount"), + reduce(collect_errors, _csvfile, (_errors, 1)))) + } + + +def merge_dicts(*dicts): + """Merge multiple dicts into a single one.""" + return reduce(lambda merged, dct: {**merged, **dct}, dicts, {}) + + +def decimal_points_error(# pylint: disable=[too-many-arguments] + filename: str, + rowtitle: str, + coltitle: str, + cellvalue: str, + message: str, + decimal_places: int = 1 +) -> Optional[InvalidValue]: + """Returns an error if the value does not meet the checks.""" + if not bool(decimal_places_pattern(decimal_places).match(cellvalue)): + return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) + return None + + +def integer_error( + filename: str, + rowtitle: str, + coltitle: str, + cellvalue: str, + message: str +) -> Optional[InvalidValue]: + """Returns an error if the value does not meet the checks.""" + try: + value = int(cellvalue) + if value <= 0: + raise ValueError("Must be a non-zero, positive number.") + return None + except ValueError as _verr: + return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) + + +def qc_pheno_file(# pylint: disable=[too-many-arguments] + filepath: Path, + redisuri: str, + fqkey: str, + samples: tuple[str, ...], + phenonames: tuple[str, ...], + separator: str, + comment_char: str, + na_strings: Sequence[str], + error_fn: Callable = decimal_points_error +): + """Run QC/QA on a `pheno` file.""" + with redis_logger( + redisuri, + f"{__MODULE__}.qc_pheno_file", + filepath.name, + fqkey) as logger: + logger.info("Running QC on file: %s", filepath.name) + _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) + _headings: tuple[str, ...] = tuple( + heading.lower() for heading in next(_csvfile)) + _errors: tuple[InvalidValue, ...] = tuple() + + _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames) + if len(_absent) > 0: + _errors = _errors + (InvalidValue( + filepath.name, + "header row", + "-", + ", ".join(_absent), + (f"The phenotype names ({', '.join(samples)}) do not exist in any " + "of the provided phenocovar files.")),) + + def collect_errors(errors_and_linecount, line): + _errs, _lc = errors_and_linecount + if line[0] not in samples: + _errs = _errs + (InvalidValue( + filepath.name, + line[0], + _headings[0], + line[0], + (f"The sample named '{line[0]}' does not exist in the database. " + "You will need to upload that first.")),) + + for field, value in zip(_headings[1:], line[1:]): + if value in na_strings: + continue + _err = error_fn( + filepath.name, + line[0], + field, + value) + _errs = _errs + ((_err,) if bool(_err) else tuple()) + + return _errs, _lc+1 + + return { + filepath.name: dict(zip( + ("errors", "linecount"), + reduce(collect_errors, _csvfile, (_errors, 1)))) + } + + +def phenotype_names(filepath: Path, + separator: str, + comment_char: str) -> tuple[str, ...]: + """Read phenotype names from `phenocovar` file.""" + return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value] + rqtl2.read_csv_file(filepath, separator, comment_char), + tuple())[1:] + +def fullyqualifiedkey( + prefix: str, + rest: Optional[str] = None +) -> Union[Callable[[str], str], str]: + """Compute fully qualified Redis key.""" + if not bool(rest): + return lambda _rest: f"{prefix}:{_rest}" + return f"{prefix}:{rest}" + +def run_qc(# pylint: disable=[too-many-locals] + dbconn: mdb.Connection, + args: Namespace, + logger: Logger +) -> int: + """Run quality control checks on the bundle.""" + logger.debug("Beginning the quality assuarance checks.") + results = check_for_averages_files( + **check_for_mandatory_pheno_keys( + **validate(args.rqtl2bundle, logger))) + errors = results.get("errors", tuple()) + if len(errors) > 0: + logger.error("We found the following errors:\n%s", + "\n".join(f" - {error}" for error in errors)) + return 1 + # Run QC on actual values + # Steps: + # - Extract file to specific directory + extractiondir, *_bundlefiles = extract_bundle( + args.rqtl2bundle, args.workingdir, args.jobid) + + # - For every pheno, phenocovar, phenose, phenonum file, undo + # transposition where relevant + cdata = rqtl2.control_data(extractiondir) + with mproc.Pool(mproc.cpu_count() - 1) as pool: + pool.starmap( + undo_transpose, + ((ftype, cdata, extractiondir) + for ftype in ("pheno", "phenocovar", "phenose", "phenonum"))) + + # - Fetch samples/individuals from database. + logger.debug("Fetching samples/individuals from the database.") + samples = tuple(#type: ignore[var-annotated] + item for item in set(reduce( + lambda acc, item: acc + ( + item["Name"], item["Name2"], item["Symbol"], item["Alias"]), + samples_by_species_and_population( + dbconn, args.speciesid, args.populationid), + tuple())) + if bool(item)) + + # - Check that `description` and `units` is present in phenocovar for + # all phenotypes + with mproc.Pool(mproc.cpu_count() - 1) as pool: + logger.debug("Check for errors in 'phenocovar' file(s).") + _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( + (extractiondir.joinpath(_file), + args.redisuri, + chain( + "phenocovar", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + cdata["sep"], + cdata["comment.char"]) + for _file in cdata.get("phenocovar", [])))) + + # - Check all samples in pheno files exist in database + # - Check all phenotypes in pheno files exist in phenocovar files + # - Check all numeric values in pheno files + phenonames = tuple(set( + name for names in pool.starmap(phenotype_names, tuple( + (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"]) + for _file in cdata.get("phenocovar", []))) + for name in names)) + + dec_err_fn = partial(decimal_points_error, message=( + "Expected a non-negative number with at least one decimal " + "place.")) + + logger.debug("Check for errors in 'pheno' file(s).") + _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + extractiondir.joinpath(_file), + args.redisuri, + chain( + "pheno", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + samples, + phenonames, + cdata["sep"], + cdata["comment.char"], + cdata["na.strings"], + dec_err_fn + ) for _file in cdata.get("pheno", [])))) + + # - Check the 3 checks above for phenose and phenonum values too + # qc_phenose_files(…) + # qc_phenonum_files(…) + logger.debug("Check for errors in 'phenose' file(s).") + _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + extractiondir.joinpath(_file), + args.redisuri, + chain( + "phenose", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + samples, + phenonames, + cdata["sep"], + cdata["comment.char"], + cdata["na.strings"], + dec_err_fn + ) for _file in cdata.get("phenose", [])))) + + logger.debug("Check for errors in 'phenonum' file(s).") + _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + extractiondir.joinpath(_file), + args.redisuri, + chain( + "phenonum", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + samples, + phenonames, + cdata["sep"], + cdata["comment.char"], + cdata["na.strings"], + partial(integer_error, message=( + "Expected a non-negative, non-zero integer value.")) + ) for _file in cdata.get("phenonum", [])))) + + # - Delete all extracted files + shutil.rmtree(extractiondir) + raise NotImplementedError("WIP!") + + +if __name__ == "__main__": + def cli_args(): + """Process command-line arguments for `install_phenos`""" + parser = add_bundle_argument(add_global_data_arguments(init_cli_parser( + program="PhenotypesQC", + description=( + "Perform Quality Control checks on a phenotypes bundle file")))) + parser.add_argument( + "--workingdir", + default=f"{tempfile.gettempdir()}/phenotypes_qc", + help=("The directory where this script will put its intermediate " + "files."), + type=Path) + return parser.parse_args() + + main = build_main(cli_args(), run_qc, __MODULE__) + sys.exit(main()) |