diff options
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 173 |
1 files changed, 117 insertions, 56 deletions
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py index 3448790..4c02578 100644 --- a/scripts/rqtl2/phenotypes_qc.py +++ b/scripts/rqtl2/phenotypes_qc.py @@ -1,31 +1,40 @@ """Run quality control on phenotypes-specific files in the bundle.""" import sys import shutil +import logging import tempfile +import contextlib from pathlib import Path +from logging import Logger from zipfile import ZipFile +from argparse import Namespace import multiprocessing as mproc from functools import reduce, partial -from typing import Callable, Optional, Sequence -from logging import Logger, getLogger, StreamHandler +from typing import Union, Iterator, Callable, Optional, Sequence import MySQLdb as mdb +from redis import Redis from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe from r_qtl.fileerrors import InvalidValue +from functional_tools import chain + from quality_control.checks import decimal_places_pattern from uploader.files import sha256_digest_over_file from uploader.samples.models import samples_by_species_and_population from scripts.rqtl2.entry import build_main +from scripts.redis_logger import RedisMessageListHandler from scripts.rqtl2.cli_parser import add_bundle_argument from scripts.cli_parser import init_cli_parser, add_global_data_arguments from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter +__MODULE__ = "scripts.rqtl2.phenotypes_qc" + def validate(phenobundle: Path, logger: Logger) -> dict: """Check that the bundle is generally valid""" try: @@ -121,47 +130,78 @@ def undo_transpose(filetype: str, cdata: dict, extractiondir): build_line_joiner(cdata)) -def qc_phenocovar_file(phenocovarfile: Path, separator: str, comment_char: str): - """Check that `phenocovar` files are structured correctly.""" - _csvfile = rqtl2.read_csv_file(phenocovarfile, separator, comment_char) - _headings = tuple(heading.lower() for heading in next(_csvfile)) - _errors = tuple() - for heading in ("description", "units"): - if heading not in _headings: - _errors = (InvalidValue( - phenocovarfile.name, - "header row", - "-", - "-", - (f"File {phenocovarfile.name} is missing the {heading} heading " - "in the header line.")),) +@contextlib.contextmanager +def redis_logger( + redisuri: str, loggername: str, filename: str, fqkey: str +) -> Iterator[logging.Logger]: + """Build a Redis message-list logger.""" + rconn = Redis.from_url(redisuri, decode_responses=True) + logger = logging.getLogger(loggername) + logger.propagate = False + handler = RedisMessageListHandler( + rconn, + fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type] + handler.setFormatter(logging.getLogger().handlers[0].formatter) + logger.addHandler(handler) + try: + yield logger + finally: + rconn.close() - def collect_errors(errors_and_linecount, line): - _errs, _lc = errors_and_linecount - if len(line) != len(_headings): - _errs = _errs + (InvalidValue( - phenocovarfile.name, - line[0], - "-", - "-", - (f"Record {_lc} in file {phenocovarfile.name} has a different " - "number of columns than the number of headings")),) - _line = dict(zip(_headings, line)) - if not bool(_line["description"]): - _errs = _errs + ( - InvalidValue(phenocovarfile.name, - _line[_headings[0]], - "description", - _line["description"], - "The description is not provided!"),) - return _errs, _lc+1 +def qc_phenocovar_file( + filename: Path, + redisuri, + fqkey: str, + separator: str, + comment_char: str): + """Check that `phenocovar` files are structured correctly.""" + with redis_logger( + redisuri, + f"{__MODULE__}.qc_phenocovar_file", + filename.name, + fqkey) as logger: + logger.info("Running QC on file: %s", filename.name) + _csvfile = rqtl2.read_csv_file(filename, separator, comment_char) + _headings = tuple(heading.lower() for heading in next(_csvfile)) + _errors: tuple[InvalidValue, ...] = tuple() + for heading in ("description", "units"): + if heading not in _headings: + _errors = (InvalidValue( + filename.name, + "header row", + "-", + "-", + (f"File {filename.name} is missing the {heading} heading " + "in the header line.")),) + + def collect_errors(errors_and_linecount, line): + _errs, _lc = errors_and_linecount + logger.info("Testing record '%s'", line[0]) + if len(line) != len(_headings): + _errs = _errs + (InvalidValue( + filename.name, + line[0], + "-", + "-", + (f"Record {_lc} in file {filename.name} has a different " + "number of columns than the number of headings")),) + _line = dict(zip(_headings, line)) + if not bool(_line["description"]): + _errs = _errs + ( + InvalidValue(filename.name, + _line[_headings[0]], + "description", + _line["description"], + "The description is not provided!"),) + + return _errs, _lc+1 - return { - phenocovarfile.name: dict(zip( - ("errors", "linecount"), - reduce(collect_errors, _csvfile, (_errors, 1)))) - } + return { + filename.name: dict(zip( + ("errors", "linecount"), + reduce(collect_errors, _csvfile, (_errors, 1)))) + } def merge_dicts(*dicts): @@ -211,8 +251,9 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] ): """Run QC/QA on a `pheno` file.""" _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) - _headings = tuple(heading.lower() for heading in next(_csvfile)) - _errors = tuple() + _headings: tuple[str, ...] = tuple( + heading.lower() for heading in next(_csvfile)) + _errors: tuple[InvalidValue, ...] = tuple() _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames) if len(_absent) > 0: @@ -258,20 +299,28 @@ def phenotype_names(filepath: Path, separator: str, comment_char: str) -> tuple[str, ...]: """Read phenotype names from `phenocovar` file.""" - return reduce(lambda tpl, line: tpl + (line[0],), + return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value] rqtl2.read_csv_file(filepath, separator, comment_char), tuple())[1:] +def fullyqualifiedkey( + prefix: str, + rest: Optional[str] = None +) -> Union[Callable[[str], str], str]: + """Compute fully qualified Redis key.""" + if not bool(rest): + return lambda _rest: f"{prefix}:{_rest}" + return f"{prefix}:{rest}" -def run_qc(# pylint: disable=[too-many-arguments] +def run_qc(# pylint: disable=[too-many-locals] dbconn: mdb.Connection, - phenobundle: Path, - workingdir: Path, - speciesid: int, - populationid: int, + args: Namespace, logger: Logger ) -> int: """Run quality control checks on the bundle.""" + (phenobundle, workingdir, speciesid, populationid) = ( + args.rqtl2bundle, args.workingdir, args.speciesid, args.populationid) + logger.debug("Beginning the quality assuarance checks.") results = check_for_averages_files( **check_for_mandatory_pheno_keys( **validate(phenobundle, logger))) @@ -295,7 +344,8 @@ def run_qc(# pylint: disable=[too-many-arguments] for ftype in ("pheno", "phenocovar", "phenose", "phenonum"))) # - Fetch samples/individuals from database. - samples = tuple( + logger.debug("Fetching samples/individuals from the database.") + samples = tuple(#type: ignore[var-annotated] item for item in set(reduce( lambda acc, item: acc + ( item["Name"], item["Name2"], item["Symbol"], item["Alias"]), @@ -306,8 +356,16 @@ def run_qc(# pylint: disable=[too-many-arguments] # - Check that `description` and `units` is present in phenocovar for # all phenotypes with mproc.Pool(mproc.cpu_count() - 1) as pool: - phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( - (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"]) + logger.debug("Check for errors in 'phenocovar' file(s).") + _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( + (extractiondir.joinpath(_file), + args.redisuri, + chain( + "phenocovar", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + cdata["sep"], + cdata["comment.char"]) for _file in cdata.get("phenocovar", [])))) # - Check all samples in pheno files exist in database @@ -322,7 +380,9 @@ def run_qc(# pylint: disable=[too-many-arguments] dec_err_fn = partial(decimal_points_error, message=( "Expected a non-negative number with at least one decimal " "place.")) - pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + + logger.debug("Check for errors in 'pheno' file(s).") + _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), samples, phenonames, @@ -335,7 +395,8 @@ def run_qc(# pylint: disable=[too-many-arguments] # - Check the 3 checks above for phenose and phenonum values too # qc_phenose_files(…) # qc_phenonum_files(…) - phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + logger.debug("Check for errors in 'phenose' file(s).") + _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), samples, phenonames, @@ -345,7 +406,8 @@ def run_qc(# pylint: disable=[too-many-arguments] dec_err_fn ) for _file in cdata.get("phenose", [])))) - phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + logger.debug("Check for errors in 'phenonum' file(s).") + _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), samples, phenonames, @@ -362,7 +424,6 @@ def run_qc(# pylint: disable=[too-many-arguments] if __name__ == "__main__": - def cli_args(): """Process command-line arguments for `install_phenos`""" parser = add_bundle_argument(add_global_data_arguments(init_cli_parser( @@ -370,7 +431,7 @@ if __name__ == "__main__": description=( "Perform Quality Control checks on a phenotypes bundle file")))) parser.add_argument( - "--working-dir", + "--workingdir", default=f"{tempfile.gettempdir()}/phenotypes_qc", help=("The directory where this script will put its intermediate " "files."), |