aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/rqtl2/phenotypes_qc.py173
1 files changed, 117 insertions, 56 deletions
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
index 3448790..4c02578 100644
--- a/scripts/rqtl2/phenotypes_qc.py
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -1,31 +1,40 @@
"""Run quality control on phenotypes-specific files in the bundle."""
import sys
import shutil
+import logging
import tempfile
+import contextlib
from pathlib import Path
+from logging import Logger
from zipfile import ZipFile
+from argparse import Namespace
import multiprocessing as mproc
from functools import reduce, partial
-from typing import Callable, Optional, Sequence
-from logging import Logger, getLogger, StreamHandler
+from typing import Union, Iterator, Callable, Optional, Sequence
import MySQLdb as mdb
+from redis import Redis
from r_qtl import r_qtl2 as rqtl2
from r_qtl import r_qtl2_qc as rqc
from r_qtl import exceptions as rqe
from r_qtl.fileerrors import InvalidValue
+from functional_tools import chain
+
from quality_control.checks import decimal_places_pattern
from uploader.files import sha256_digest_over_file
from uploader.samples.models import samples_by_species_and_population
from scripts.rqtl2.entry import build_main
+from scripts.redis_logger import RedisMessageListHandler
from scripts.rqtl2.cli_parser import add_bundle_argument
from scripts.cli_parser import init_cli_parser, add_global_data_arguments
from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+
def validate(phenobundle: Path, logger: Logger) -> dict:
"""Check that the bundle is generally valid"""
try:
@@ -121,47 +130,78 @@ def undo_transpose(filetype: str, cdata: dict, extractiondir):
build_line_joiner(cdata))
-def qc_phenocovar_file(phenocovarfile: Path, separator: str, comment_char: str):
- """Check that `phenocovar` files are structured correctly."""
- _csvfile = rqtl2.read_csv_file(phenocovarfile, separator, comment_char)
- _headings = tuple(heading.lower() for heading in next(_csvfile))
- _errors = tuple()
- for heading in ("description", "units"):
- if heading not in _headings:
- _errors = (InvalidValue(
- phenocovarfile.name,
- "header row",
- "-",
- "-",
- (f"File {phenocovarfile.name} is missing the {heading} heading "
- "in the header line.")),)
+@contextlib.contextmanager
+def redis_logger(
+ redisuri: str, loggername: str, filename: str, fqkey: str
+) -> Iterator[logging.Logger]:
+ """Build a Redis message-list logger."""
+ rconn = Redis.from_url(redisuri, decode_responses=True)
+ logger = logging.getLogger(loggername)
+ logger.propagate = False
+ handler = RedisMessageListHandler(
+ rconn,
+ fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
+ handler.setFormatter(logging.getLogger().handlers[0].formatter)
+ logger.addHandler(handler)
+ try:
+ yield logger
+ finally:
+ rconn.close()
- def collect_errors(errors_and_linecount, line):
- _errs, _lc = errors_and_linecount
- if len(line) != len(_headings):
- _errs = _errs + (InvalidValue(
- phenocovarfile.name,
- line[0],
- "-",
- "-",
- (f"Record {_lc} in file {phenocovarfile.name} has a different "
- "number of columns than the number of headings")),)
- _line = dict(zip(_headings, line))
- if not bool(_line["description"]):
- _errs = _errs + (
- InvalidValue(phenocovarfile.name,
- _line[_headings[0]],
- "description",
- _line["description"],
- "The description is not provided!"),)
- return _errs, _lc+1
+def qc_phenocovar_file(
+ filename: Path,
+ redisuri,
+ fqkey: str,
+ separator: str,
+ comment_char: str):
+ """Check that `phenocovar` files are structured correctly."""
+ with redis_logger(
+ redisuri,
+ f"{__MODULE__}.qc_phenocovar_file",
+ filename.name,
+ fqkey) as logger:
+ logger.info("Running QC on file: %s", filename.name)
+ _csvfile = rqtl2.read_csv_file(filename, separator, comment_char)
+ _headings = tuple(heading.lower() for heading in next(_csvfile))
+ _errors: tuple[InvalidValue, ...] = tuple()
+ for heading in ("description", "units"):
+ if heading not in _headings:
+ _errors = (InvalidValue(
+ filename.name,
+ "header row",
+ "-",
+ "-",
+ (f"File {filename.name} is missing the {heading} heading "
+ "in the header line.")),)
+
+ def collect_errors(errors_and_linecount, line):
+ _errs, _lc = errors_and_linecount
+ logger.info("Testing record '%s'", line[0])
+ if len(line) != len(_headings):
+ _errs = _errs + (InvalidValue(
+ filename.name,
+ line[0],
+ "-",
+ "-",
+ (f"Record {_lc} in file {filename.name} has a different "
+ "number of columns than the number of headings")),)
+ _line = dict(zip(_headings, line))
+ if not bool(_line["description"]):
+ _errs = _errs + (
+ InvalidValue(filename.name,
+ _line[_headings[0]],
+ "description",
+ _line["description"],
+ "The description is not provided!"),)
+
+ return _errs, _lc+1
- return {
- phenocovarfile.name: dict(zip(
- ("errors", "linecount"),
- reduce(collect_errors, _csvfile, (_errors, 1))))
- }
+ return {
+ filename.name: dict(zip(
+ ("errors", "linecount"),
+ reduce(collect_errors, _csvfile, (_errors, 1))))
+ }
def merge_dicts(*dicts):
@@ -211,8 +251,9 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
):
"""Run QC/QA on a `pheno` file."""
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
- _headings = tuple(heading.lower() for heading in next(_csvfile))
- _errors = tuple()
+ _headings: tuple[str, ...] = tuple(
+ heading.lower() for heading in next(_csvfile))
+ _errors: tuple[InvalidValue, ...] = tuple()
_absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames)
if len(_absent) > 0:
@@ -258,20 +299,28 @@ def phenotype_names(filepath: Path,
separator: str,
comment_char: str) -> tuple[str, ...]:
"""Read phenotype names from `phenocovar` file."""
- return reduce(lambda tpl, line: tpl + (line[0],),
+ return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value]
rqtl2.read_csv_file(filepath, separator, comment_char),
tuple())[1:]
+def fullyqualifiedkey(
+ prefix: str,
+ rest: Optional[str] = None
+) -> Union[Callable[[str], str], str]:
+ """Compute fully qualified Redis key."""
+ if not bool(rest):
+ return lambda _rest: f"{prefix}:{_rest}"
+ return f"{prefix}:{rest}"
-def run_qc(# pylint: disable=[too-many-arguments]
+def run_qc(# pylint: disable=[too-many-locals]
dbconn: mdb.Connection,
- phenobundle: Path,
- workingdir: Path,
- speciesid: int,
- populationid: int,
+ args: Namespace,
logger: Logger
) -> int:
"""Run quality control checks on the bundle."""
+ (phenobundle, workingdir, speciesid, populationid) = (
+ args.rqtl2bundle, args.workingdir, args.speciesid, args.populationid)
+ logger.debug("Beginning the quality assuarance checks.")
results = check_for_averages_files(
**check_for_mandatory_pheno_keys(
**validate(phenobundle, logger)))
@@ -295,7 +344,8 @@ def run_qc(# pylint: disable=[too-many-arguments]
for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
# - Fetch samples/individuals from database.
- samples = tuple(
+ logger.debug("Fetching samples/individuals from the database.")
+ samples = tuple(#type: ignore[var-annotated]
item for item in set(reduce(
lambda acc, item: acc + (
item["Name"], item["Name2"], item["Symbol"], item["Alias"]),
@@ -306,8 +356,16 @@ def run_qc(# pylint: disable=[too-many-arguments]
# - Check that `description` and `units` is present in phenocovar for
# all phenotypes
with mproc.Pool(mproc.cpu_count() - 1) as pool:
- phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
- (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"])
+ logger.debug("Check for errors in 'phenocovar' file(s).")
+ _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
+ (extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "phenocovar",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ cdata["sep"],
+ cdata["comment.char"])
for _file in cdata.get("phenocovar", []))))
# - Check all samples in pheno files exist in database
@@ -322,7 +380,9 @@ def run_qc(# pylint: disable=[too-many-arguments]
dec_err_fn = partial(decimal_points_error, message=(
"Expected a non-negative number with at least one decimal "
"place."))
- pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+
+ logger.debug("Check for errors in 'pheno' file(s).")
+ _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
extractiondir.joinpath(_file),
samples,
phenonames,
@@ -335,7 +395,8 @@ def run_qc(# pylint: disable=[too-many-arguments]
# - Check the 3 checks above for phenose and phenonum values too
# qc_phenose_files(…)
# qc_phenonum_files(…)
- phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ logger.debug("Check for errors in 'phenose' file(s).")
+ _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
extractiondir.joinpath(_file),
samples,
phenonames,
@@ -345,7 +406,8 @@ def run_qc(# pylint: disable=[too-many-arguments]
dec_err_fn
) for _file in cdata.get("phenose", []))))
- phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ logger.debug("Check for errors in 'phenonum' file(s).")
+ _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
extractiondir.joinpath(_file),
samples,
phenonames,
@@ -362,7 +424,6 @@ def run_qc(# pylint: disable=[too-many-arguments]
if __name__ == "__main__":
-
def cli_args():
"""Process command-line arguments for `install_phenos`"""
parser = add_bundle_argument(add_global_data_arguments(init_cli_parser(
@@ -370,7 +431,7 @@ if __name__ == "__main__":
description=(
"Perform Quality Control checks on a phenotypes bundle file"))))
parser.add_argument(
- "--working-dir",
+ "--workingdir",
default=f"{tempfile.gettempdir()}/phenotypes_qc",
help=("The directory where this script will put its intermediate "
"files."),