aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/cli_parser.py6
-rw-r--r--scripts/process_rqtl2_bundle.py19
-rw-r--r--scripts/qc_on_rqtl2_bundle2.py346
-rw-r--r--scripts/redis_logger.py21
-rw-r--r--scripts/rqtl2/bundleutils.py44
-rw-r--r--scripts/rqtl2/cli_parser.py20
-rw-r--r--scripts/rqtl2/entry.py22
-rw-r--r--scripts/rqtl2/install_genotypes.py25
-rw-r--r--scripts/rqtl2/install_phenos.py26
-rw-r--r--scripts/rqtl2/phenotypes_qc.py468
10 files changed, 941 insertions, 56 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index 308ee4b..d42ae66 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -19,6 +19,12 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
type=int,
default=86400,
help="How long to keep any redis keys around.")
+ parser.add_argument(
+ "--loglevel",
+ type=str,
+ default="INFO",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+ help="The severity of events to track with the logger.")
return parser
def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser:
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 20cfd3b..ade9862 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -2,6 +2,7 @@
import sys
import uuid
import json
+import argparse
import traceback
from typing import Any
from pathlib import Path
@@ -94,10 +95,11 @@ def process_bundle(dbconn: mdb.Connection,
logger.info("Processing geno files.")
genoexit = install_genotypes(
dbconn,
- meta["speciesid"],
- meta["populationid"],
- meta["geno-dataset-id"],
- Path(meta["rqtl2-bundle-file"]),
+ argparse.Namespace(
+ speciesid=meta["speciesid"],
+ populationid=meta["populationid"],
+ datasetid=meta["geno-dataset-id"],
+ rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
logger)
if genoexit != 0:
raise Exception("Processing 'geno' file failed.")
@@ -109,10 +111,11 @@ def process_bundle(dbconn: mdb.Connection,
if has_pheno_file(thejob):
phenoexit = install_pheno_files(
dbconn,
- meta["speciesid"],
- meta["platformid"],
- meta["probe-dataset-id"],
- Path(meta["rqtl2-bundle-file"]),
+ argparse.Namespace(
+ speciesid=meta["speciesid"],
+ platformid=meta["platformid"],
+ dataset_id=meta["probe-dataset-id"],
+ rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
logger)
if phenoexit != 0:
raise Exception("Processing 'pheno' file failed.")
diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
new file mode 100644
index 0000000..7e5d253
--- /dev/null
+++ b/scripts/qc_on_rqtl2_bundle2.py
@@ -0,0 +1,346 @@
+"""Run Quality Control checks on R/qtl2 bundle."""
+import os
+import sys
+import json
+from time import sleep
+from pathlib import Path
+from zipfile import ZipFile
+from argparse import Namespace
+from datetime import timedelta
+import multiprocessing as mproc
+from functools import reduce, partial
+from logging import Logger, getLogger, StreamHandler
+from typing import Union, Sequence, Callable, Iterator
+
+import MySQLdb as mdb
+from redis import Redis
+
+from quality_control.errors import InvalidValue
+from quality_control.checks import decimal_points_error
+
+from uploader import jobs
+from uploader.db_utils import database_connection
+from uploader.check_connections import check_db, check_redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl import fileerrors as rqfe
+
+from scripts.process_rqtl2_bundle import parse_job
+from scripts.redis_logger import setup_redis_logger
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+
+def check_for_missing_files(
+ rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool:
+ """Check that all files listed in the control file do actually exist."""
+ logger.info("Checking for missing files.")
+ missing = rqc.missing_files(extractpath)
+ # add_to_errors(rconn, fqjobid, "errors-generic", tuple(
+ # rqfe.MissingFile(
+ # mfile[0], mfile[1], (
+ # f"File '{mfile[1]}' is listed in the control file under "
+ # f"the '{mfile[0]}' key, but it does not actually exist in "
+ # "the bundle."))
+ # for mfile in missing))
+ if len(missing) > 0:
+ logger.error(f"Missing files in the bundle!")
+ return True
+ return False
+
+
+def open_file(file_: Path) -> Iterator:
+ """Open file and return one line at a time."""
+ with open(file_, "r", encoding="utf8") as infile:
+ for line in infile:
+ yield line
+
+
+def check_markers(
+ filename: str,
+ row: tuple[str, ...],
+ save_error: lambda val: val
+) -> tuple[rqfe.InvalidValue]:
+ """Check that the markers are okay"""
+ errors = tuple()
+ counts = {}
+ for marker in row:
+ counts = {**counts, marker: counts.get(marker, 0) + 1}
+ if marker is None or marker == "":
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename,
+ "markers"
+ "-",
+ marker,
+ "A marker MUST be a valid value.")),)
+
+ return errors + tuple(
+ save_error(rqfe.InvalidValue(
+ filename,
+ "markers",
+ key,
+ f"Marker '{key}' was repeated {value} times"))
+ for key,value in counts.items() if value > 1)
+
+
+def check_geno_line(
+ filename: str,
+ headers: tuple[str, ...],
+ row: tuple[Union[str, None]],
+ cdata: dict,
+ save_error: lambda val: val
+) -> tuple[rqfe.InvalidValue]:
+ """Check that the geno line is correct."""
+ errors = tuple()
+ # Verify that line has same number of columns as headers
+ if len(headers) != len(row):
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename,
+ headers[0],
+ row[0],
+ row[0],
+ "Every line MUST have the same number of columns.")),)
+
+ # First column is the individuals/cases/samples
+ if not bool(row[0]):
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename,
+ headers[0],
+ row[0],
+ row[0],
+ "The sample/case MUST be a valid value.")),)
+
+ def __process_value__(val):
+ if val in cdata["na.strings"]:
+ return None
+ if val in cdata["alleles"]:
+ return cdata["genotypes"][val]
+
+ genocode = cdata.get("genotypes", {})
+ for coltitle, cellvalue in zip(headers[1:],row[1:]):
+ if (
+ bool(genocode) and
+ cellvalue is not None and
+ cellvalue not in genocode.keys()
+ ):
+ errors = errors + (save_error(rqfe.InvalidValue(
+ filename, row[0], coltitle, cellvalue,
+ f"Value '{cellvalue}' is invalid. Expected one of "
+ f"'{', '.join(genocode.keys())}'.")),)
+
+ return errors
+
+
+def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue:
+ """Push the file error to redis a json string
+
+ Parameters
+ ----------
+ rconn: Connection to redis
+ key: The name of the list where we push the errors
+ error: The file error to save
+
+ Returns
+ -------
+ Returns the file error it saved
+ """
+ if bool(error):
+ rconn.rpush(key, json.dumps(error._asdict()))
+ return error
+
+
+def file_errors_and_details(
+ redisargs: dict[str, str],
+ file_: Path,
+ filetype: str,
+ cdata: dict,
+ linesplitterfn: Callable,
+ linejoinerfn: Callable,
+ headercheckers: tuple[Callable, ...],
+ bodycheckers: tuple[Callable, ...]
+) -> dict:
+ """Compute errors, and other file metadata."""
+ errors = tuple()
+ if cdata[f"{filetype}_transposed"]:
+ rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
+
+ with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+ save_error_fn = partial(push_file_error_to_redis,
+ rconn,
+ error_list_name(filetype, file_.name))
+ for lineno, line in enumerate(open_file(file_), start=1):
+ row = linesplitterfn(line)
+ if lineno == 1:
+ headers = tuple(row)
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(
+ file_.name, row[1:], save_error_fn),
+ headercheckers,
+ tuple())
+ continue
+
+ errors = errors + reduce(
+ lambda errs, fnct: errs + fnct(
+ file_.name, headers, row, cdata, save_error_fn),
+ bodycheckers,
+ tuple())
+
+ filedetails = {
+ "filename": file_.name,
+ "filesize": os.stat(file_).st_size,
+ "linecount": lineno
+ }
+ rconn.hset(redisargs["fqjobid"],
+ f"file-details:{filetype}:{file_.name}",
+ json.dumps(filedetails))
+ return {**filedetails, "errors": errors}
+
+
+def error_list_name(filetype: str, filename: str):
+ """Compute the name of the list where the errors will be pushed.
+
+ Parameters
+ ----------
+ filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES`
+ filename: The name of the file.
+ """
+ return f"errors:{filetype}:{filename}"
+
+
+def check_for_geno_errors(
+ redisargs: dict[str, str],
+ extractdir: Path,
+ cdata: dict,
+ linesplitterfn: Callable[[str], tuple[Union[str, None]]],
+ linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
+ logger: Logger
+) -> bool:
+ """Check for errors in genotype files."""
+ if "geno" in cdata or "founder_geno" in cdata:
+ genofiles = tuple(
+ extractdir.joinpath(fname) for fname in cdata.get("geno", []))
+ fgenofiles = tuple(
+ extractdir.joinpath(fname) for fname in cdata.get("founder_geno", []))
+ allgenofiles = genofiles + fgenofiles
+ with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn:
+ error_list_names = [
+ error_list_name("geno", file_.name) for file_ in allgenofiles]
+ for list_name in error_list_names:
+ rconn.delete(list_name)
+ rconn.hset(
+ redisargs["fqjobid"],
+ "geno-errors-lists",
+ json.dumps(error_list_names))
+ processes = [
+ mproc.Process(target=file_errors_and_details,
+ args=(
+ redisargs,
+ file_,
+ ftype,
+ cdata,
+ linesplitterfn,
+ linejoinerfn,
+ (check_markers,),
+ (check_geno_line,))
+ )
+ for ftype, file_ in (
+ tuple(("geno", file_) for file_ in genofiles) +
+ tuple(("founder_geno", file_) for file_ in fgenofiles))
+ ]
+ for process in processes:
+ process.start()
+ # Set expiry for any created error lists
+ for key in error_list_names:
+ rconn.expire(name=key,
+ time=timedelta(seconds=redisargs["redisexpiry"]))
+
+ # TOD0: Add the errors to redis
+ if any(rconn.llen(errlst) > 0 for errlst in error_list_names):
+ logger.error("At least one of the 'geno' files has (an) error(s).")
+ return True
+ logger.info("No error(s) found in any of the 'geno' files.")
+
+ else:
+ logger.info("No 'geno' files to check.")
+
+ return False
+
+
+# def check_for_pheno_errors(...):
+# """Check for errors in phenotype files."""
+# pass
+
+
+# def check_for_phenose_errors(...):
+# """Check for errors in phenotype, standard-error files."""
+# pass
+
+
+# def check_for_phenocovar_errors(...):
+# """Check for errors in phenotype-covariates files."""
+# pass
+
+
+def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int:
+ """Run quality control checks on R/qtl2 bundles."""
+ thejob = parse_job(rconn, args.redisprefix, args.jobid)
+ print(f"THE JOB =================> {thejob}")
+ jobmeta = thejob["job-metadata"]
+ inpath = Path(jobmeta["rqtl2-bundle-file"])
+ extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir")
+ with ZipFile(inpath, "r") as zfile:
+ rqtl2.extract(zfile, extractdir)
+
+ ### BEGIN: The quality control checks ###
+ cdata = rqtl2.control_data(extractdir)
+ splitter = build_line_splitter(cdata)
+ joiner = build_line_joiner(cdata)
+
+ redisargs = {
+ "fqjobid": fqjobid,
+ "redisuri": args.redisuri,
+ "redisexpiry": args.redisexpiry
+ }
+ check_for_missing_files(rconn, fqjobid, extractdir, logger)
+ # check_for_pheno_errors(...)
+ check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger)
+ # check_for_phenose_errors(...)
+ # check_for_phenocovar_errors(...)
+ ### END: The quality control checks ###
+
+ def __fetch_errors__(rkey: str) -> tuple:
+ return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
+
+ return (1 if any((
+ bool(__fetch_errors__(key))
+ for key in
+ ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
+ else 0)
+
+
+if __name__ == "__main__":
+ def main():
+ """Enter R/qtl2 bundle QC runner."""
+ args = add_global_data_arguments(init_cli_parser(
+ "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
+ check_redis(args.redisuri)
+ check_db(args.databaseuri)
+
+ logger = getLogger("qc-on-rqtl2-bundle")
+ logger.addHandler(StreamHandler(stream=sys.stderr))
+ logger.setLevel("DEBUG")
+
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
+ logger.addHandler(setup_redis_logger(
+ rconn, fqjobid, f"{fqjobid}:log-messages",
+ args.redisexpiry))
+
+ exitcode = run_qc(rconn, args, fqjobid, logger)
+ rconn.hset(
+ jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
+ return exitcode
+
+ sys.exit(main())
diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py
index 2ae682b..d3fde5f 100644
--- a/scripts/redis_logger.py
+++ b/scripts/redis_logger.py
@@ -1,5 +1,6 @@
"""Utilities to log to redis for our worker scripts."""
import logging
+from typing import Optional
from redis import Redis
@@ -26,6 +27,26 @@ class RedisLogger(logging.Handler):
self.redisconnection.rpush(self.messageslistname, self.format(record))
self.redisconnection.expire(self.messageslistname, self.expiry)
+class RedisMessageListHandler(logging.Handler):
+ """Send messages to specified redis list."""
+ def __init__(self,
+ rconn: Redis,
+ fullyqualifiedkey: str,
+ loglevel: int = logging.NOTSET,
+ expiry: Optional[int] = 86400):
+ super().__init__(loglevel)
+ self.redisconnection = rconn
+ self.fullyqualifiedkey = fullyqualifiedkey
+ self.expiry = expiry
+
+ def emit(self, record):
+ """Log out to specified `fullyqualifiedkey`."""
+ self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record))
+ if bool(self.expiry):
+ self.redisconnection.expire(self.fullyqualifiedkey, self.expiry)
+ else:
+ self.redisconnection.persist(self.fullyqualifiedkey)
+
def setup_redis_logger(rconn: Redis,
fullyqualifiedjobid: str,
job_messagelist: str,
diff --git a/scripts/rqtl2/bundleutils.py b/scripts/rqtl2/bundleutils.py
new file mode 100644
index 0000000..17faa7c
--- /dev/null
+++ b/scripts/rqtl2/bundleutils.py
@@ -0,0 +1,44 @@
+"""Common utilities to operate in R/qtl2 bundles."""
+from typing import Union, Callable
+
+def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]:
+ """Build and return a function to use to split data in the files.
+
+ Parameters
+ ----------
+ cdata: A dict holding the control information included with the R/qtl2
+ bundle.
+
+ Returns
+ -------
+ A function that takes a string and return a tuple of strings.
+ """
+ separator = cdata["sep"]
+ na_strings = cdata["na.strings"]
+ def __splitter__(line: str) -> tuple[Union[str, None], ...]:
+ return tuple(
+ item if item not in na_strings else None
+ for item in
+ (field.strip() for field in line.strip().split(separator)))
+ return __splitter__
+
+
+def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]:
+ """Build and return a function to use to split data in the files.
+
+ Parameters
+ ----------
+ cdata: A dict holding the control information included with the R/qtl2
+ bundle.
+
+ Returns
+ -------
+ A function that takes a string and return a tuple of strings.
+ """
+ separator = cdata["sep"]
+ na_strings = cdata["na.strings"]
+ def __joiner__(row: tuple[Union[str, None], ...]) -> str:
+ return separator.join(
+ (na_strings[0] if item is None else item)
+ for item in row)
+ return __joiner__
diff --git a/scripts/rqtl2/cli_parser.py b/scripts/rqtl2/cli_parser.py
index bcc7a4f..9bb60a3 100644
--- a/scripts/rqtl2/cli_parser.py
+++ b/scripts/rqtl2/cli_parser.py
@@ -2,12 +2,22 @@
from pathlib import Path
from argparse import ArgumentParser
-def add_common_arguments(parser: ArgumentParser) -> ArgumentParser:
- """Add common arguments to the CLI parser."""
- parser.add_argument("datasetid",
- type=int,
- help="The dataset to which the data belongs.")
+def add_bundle_argument(parser: ArgumentParser) -> ArgumentParser:
+ """Add the `rqtl2bundle` argument."""
parser.add_argument("rqtl2bundle",
type=Path,
help="Path to R/qtl2 bundle zip file.")
return parser
+
+
+def add_datasetid_argument(parser: ArgumentParser) -> ArgumentParser:
+ """Add the `datasetid` argument."""
+ parser.add_argument("datasetid",
+ type=int,
+ help="The dataset to which the data belongs.")
+ return parser
+
+
+def add_common_arguments(parser: ArgumentParser) -> ArgumentParser:
+ """Add common arguments to the CLI parser."""
+ return add_bundle_argument(add_datasetid_argument(parser))
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index b7fb68e..bc4cd9f 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -1,5 +1,5 @@
"""Build common script-entry structure."""
-from logging import Logger
+import logging
from typing import Callable
from argparse import Namespace
@@ -12,12 +12,19 @@ from uploader.check_connections import check_db, check_redis
from scripts.redis_logger import setup_redis_logger
-def build_main(args: Namespace,
- run_fn: Callable[[Connection, Namespace], int],
- logger: Logger,
- loglevel: str = "INFO") -> Callable[[],int]:
+def build_main(
+ args: Namespace,
+ run_fn: Callable[[Connection, Namespace, logging.Logger], int],
+ loggername: str
+) -> Callable[[],int]:
"""Build a function to be used as an entry-point for scripts."""
def main():
+ logging.basicConfig(
+ format=(
+ "%(asctime)s - %(levelname)s %(name)s: "
+ "(%(pathname)s: %(lineno)d) %(message)s"),
+ level=args.loglevel)
+ logger = logging.getLogger(loggername)
check_db(args.databaseuri)
check_redis(args.redisuri)
if not args.rqtl2bundle.exists():
@@ -26,13 +33,12 @@ def build_main(args: Namespace,
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
- fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid)
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
logger.addHandler(setup_redis_logger(
rconn,
fqjobid,
f"{fqjobid}:log-messages",
args.redisexpiry))
- logger.setLevel(loglevel)
- return run_fn(dbconn, args)
+ return run_fn(dbconn, args, logger)
return main
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index 6b89142..20a19da 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -1,11 +1,11 @@
"""Load genotypes from R/qtl2 bundle into the database."""
import sys
+import argparse
import traceback
-from pathlib import Path
from zipfile import ZipFile
from functools import reduce
from typing import Iterator, Optional
-from logging import Logger, getLogger, StreamHandler
+from logging import Logger, getLogger
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -19,6 +19,8 @@ from scripts.rqtl2.entry import build_main
from scripts.rqtl2.cli_parser import add_common_arguments
from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+__MODULE__ = "scripts.rqtl2.install_genotypes"
+
def insert_markers(
dbconn: mdb.Connection,
speciesid: int,
@@ -185,13 +187,12 @@ def cross_reference_genotypes(
def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
dbconn: mdb.Connection,
- speciesid: int,
- populationid: int,
- datasetid: int,
- rqtl2bundle: Path,
+ args: argparse.Namespace,
logger: Logger = getLogger(__name__)
) -> int:
"""Load any existing genotypes into the database."""
+ (speciesid, populationid, datasetid, rqtl2bundle) = (
+ args.speciesid, args.populationid, args.datasetid, args.rqtl2bundle)
count = 0
with ZipFile(str(rqtl2bundle.absolute()), "r") as zfile:
try:
@@ -253,15 +254,5 @@ if __name__ == "__main__":
return parser.parse_args()
- thelogger = getLogger("install_genotypes")
- thelogger.addHandler(StreamHandler(stream=sys.stderr))
- main = build_main(
- cli_args(),
- lambda dbconn, args: install_genotypes(dbconn,
- args.speciesid,
- args.populationid,
- args.datasetid,
- args.rqtl2bundle),
- thelogger,
- "INFO")
+ main = build_main(cli_args(), install_genotypes, __MODULE__)
sys.exit(main())
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index b5cab8e..a6e9fb2 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -1,10 +1,10 @@
"""Load pheno from R/qtl2 bundle into the database."""
import sys
+import argparse
import traceback
-from pathlib import Path
from zipfile import ZipFile
from functools import reduce
-from logging import Logger, getLogger, StreamHandler
+from logging import Logger, getLogger
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -18,6 +18,8 @@ from r_qtl import r_qtl2_qc as rqc
from functional_tools import take
+__MODULE__ = "scripts.rqtl2.install_phenos"
+
def insert_probesets(dbconn: mdb.Connection,
platformid: int,
phenos: tuple[str, ...]) -> int:
@@ -95,12 +97,11 @@ def cross_reference_probeset_data(dbconn: mdb.Connection,
def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals]
dbconn: mdb.Connection,
- speciesid: int,
- platformid: int,
- datasetid: int,
- rqtl2bundle: Path,
+ args: argparse.Namespace,
logger: Logger = getLogger()) -> int:
"""Load data in `pheno` files and other related files into the database."""
+ (speciesid, platformid, datasetid, rqtl2bundle) = (
+ args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle)
with ZipFile(str(rqtl2bundle), "r") as zfile:
try:
rqc.validate_bundle(zfile)
@@ -155,16 +156,5 @@ if __name__ == "__main__":
return parser.parse_args()
- thelogger = getLogger("install_phenos")
- thelogger.addHandler(StreamHandler(stream=sys.stderr))
- main = build_main(
- cli_args(),
- lambda dbconn, args: install_pheno_files(dbconn,
- args.speciesid,
- args.platformid,
- args.datasetid,
- args.rqtl2bundle,
- thelogger),
- thelogger,
- "DEBUG")
+ main = build_main(cli_args(), install_pheno_files, __MODULE__)
sys.exit(main())
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
new file mode 100644
index 0000000..83828e4
--- /dev/null
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -0,0 +1,468 @@
+"""Run quality control on phenotypes-specific files in the bundle."""
+import sys
+import uuid
+import shutil
+import logging
+import tempfile
+import contextlib
+from pathlib import Path
+from logging import Logger
+from zipfile import ZipFile
+from argparse import Namespace
+import multiprocessing as mproc
+from functools import reduce, partial
+from typing import Union, Iterator, Callable, Optional, Sequence
+
+import MySQLdb as mdb
+from redis import Redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl.fileerrors import InvalidValue
+
+from functional_tools import chain
+
+from quality_control.checks import decimal_places_pattern
+
+from uploader.files import sha256_digest_over_file
+from uploader.samples.models import samples_by_species_and_population
+
+from scripts.rqtl2.entry import build_main
+from scripts.redis_logger import RedisMessageListHandler
+from scripts.rqtl2.cli_parser import add_bundle_argument
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+
+def validate(phenobundle: Path, logger: Logger) -> dict:
+ """Check that the bundle is generally valid"""
+ try:
+ rqc.validate_bundle(phenobundle)
+ except rqe.RQTLError as rqtlerr:
+ # logger.error("Bundle file validation failed!", exc_info=True)
+ return {
+ "skip": True,
+ "logger": logger,
+ "phenobundle": phenobundle,
+ "errors": (" ".join(rqtlerr.args),)
+ }
+ return {
+ "errors": tuple(),
+ "skip": False,
+ "phenobundle": phenobundle,
+ "logger": logger
+ }
+
+
+def check_for_mandatory_pheno_keys(
+ phenobundle: Path,
+ logger: Logger,
+ **kwargs
+) -> dict:
+ """Check that the mandatory keys exist for phenotypes."""
+ if kwargs.get("skip", False):
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle
+ }
+
+ _mandatory_keys = ("pheno", "phenocovar")
+ _cdata = rqtl2.read_control_file(phenobundle)
+ _errors = kwargs.get("errors", tuple()) + tuple(
+ f"Expected '{key}' file(s) are not declared in the bundle."
+ for key in _mandatory_keys if key not in _cdata.keys())
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle,
+ "errors": _errors,
+ "skip": len(_errors) > 0
+ }
+
+
+def check_for_averages_files(
+ phenobundle: Path,
+ logger: Logger,
+ **kwargs
+) -> dict:
+ """Check that averages files appear together"""
+ if kwargs.get("skip", False):
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle
+ }
+
+ _together = (("phenose", "phenonum"), ("phenonum", "phenose"))
+ _cdata = rqtl2.read_control_file(phenobundle)
+ _errors = kwargs.get("errors", tuple()) + tuple(
+ f"'{first}' is defined in the control file but there is no "
+ f"corresponding '{second}'"
+ for first, second in _together
+ if ((first in _cdata.keys()) and (second not in _cdata.keys())))
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle,
+ "errors": _errors,
+ "skip": len(_errors) > 0
+ }
+
+
+def extract_bundle(
+ bundle: Path, workdir: Path, jobid: uuid.UUID
+) -> tuple[Path, tuple[Path, ...]]:
+ """Extract the bundle."""
+ with ZipFile(bundle) as zfile:
+ extractiondir = workdir.joinpath(
+ f"{str(jobid)}-{sha256_digest_over_file(bundle)}-{bundle.name}")
+ return extractiondir, rqtl2.extract(zfile, extractiondir)
+
+
+def undo_transpose(filetype: str, cdata: dict, extractiondir):
+ """Undo transposition of all files of type `filetype` in thebundle."""
+ if len(cdata.get(filetype, [])) > 0 and cdata.get(f"{filetype}_transposed", False):
+ files = (extractiondir.joinpath(_file) for _file in cdata[filetype])
+ for _file in files:
+ rqtl2.transpose_csv_with_rename(
+ _file,
+ build_line_splitter(cdata),
+ build_line_joiner(cdata))
+
+
+@contextlib.contextmanager
+def redis_logger(
+ redisuri: str, loggername: str, filename: str, fqkey: str
+) -> Iterator[logging.Logger]:
+ """Build a Redis message-list logger."""
+ rconn = Redis.from_url(redisuri, decode_responses=True)
+ logger = logging.getLogger(loggername)
+ logger.propagate = False
+ handler = RedisMessageListHandler(
+ rconn,
+ fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
+ handler.setFormatter(logging.getLogger().handlers[0].formatter)
+ logger.addHandler(handler)
+ try:
+ yield logger
+ finally:
+ rconn.close()
+
+
+def qc_phenocovar_file(
+ filepath: Path,
+ redisuri,
+ fqkey: str,
+ separator: str,
+ comment_char: str):
+ """Check that `phenocovar` files are structured correctly."""
+ with redis_logger(
+ redisuri,
+ f"{__MODULE__}.qc_phenocovar_file",
+ filepath.name,
+ fqkey) as logger:
+ logger.info("Running QC on file: %s", filepath.name)
+ _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+ _headings = tuple(heading.lower() for heading in next(_csvfile))
+ _errors: tuple[InvalidValue, ...] = tuple()
+ for heading in ("description", "units"):
+ if heading not in _headings:
+ _errors = (InvalidValue(
+ filepath.name,
+ "header row",
+ "-",
+ "-",
+ (f"File {filepath.name} is missing the {heading} heading "
+ "in the header line.")),)
+
+ def collect_errors(errors_and_linecount, line):
+ _errs, _lc = errors_and_linecount
+ logger.info("Testing record '%s'", line[0])
+ if len(line) != len(_headings):
+ _errs = _errs + (InvalidValue(
+ filepath.name,
+ line[0],
+ "-",
+ "-",
+ (f"Record {_lc} in file {filepath.name} has a different "
+ "number of columns than the number of headings")),)
+ _line = dict(zip(_headings, line))
+ if not bool(_line["description"]):
+ _errs = _errs + (
+ InvalidValue(filepath.name,
+ _line[_headings[0]],
+ "description",
+ _line["description"],
+ "The description is not provided!"),)
+
+ return _errs, _lc+1
+
+ return {
+ filepath.name: dict(zip(
+ ("errors", "linecount"),
+ reduce(collect_errors, _csvfile, (_errors, 1))))
+ }
+
+
+def merge_dicts(*dicts):
+ """Merge multiple dicts into a single one."""
+ return reduce(lambda merged, dct: {**merged, **dct}, dicts, {})
+
+
+def decimal_points_error(# pylint: disable=[too-many-arguments]
+ filename: str,
+ rowtitle: str,
+ coltitle: str,
+ cellvalue: str,
+ message: str,
+ decimal_places: int = 1
+) -> Optional[InvalidValue]:
+ """Returns an error if the value does not meet the checks."""
+ if not bool(decimal_places_pattern(decimal_places).match(cellvalue)):
+ return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+ return None
+
+
+def integer_error(
+ filename: str,
+ rowtitle: str,
+ coltitle: str,
+ cellvalue: str,
+ message: str
+) -> Optional[InvalidValue]:
+ """Returns an error if the value does not meet the checks."""
+ try:
+ value = int(cellvalue)
+ if value <= 0:
+ raise ValueError("Must be a non-zero, positive number.")
+ return None
+ except ValueError as _verr:
+ return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+
+
+def qc_pheno_file(# pylint: disable=[too-many-arguments]
+ filepath: Path,
+ redisuri: str,
+ fqkey: str,
+ samples: tuple[str, ...],
+ phenonames: tuple[str, ...],
+ separator: str,
+ comment_char: str,
+ na_strings: Sequence[str],
+ error_fn: Callable = decimal_points_error
+):
+ """Run QC/QA on a `pheno` file."""
+ with redis_logger(
+ redisuri,
+ f"{__MODULE__}.qc_pheno_file",
+ filepath.name,
+ fqkey) as logger:
+ logger.info("Running QC on file: %s", filepath.name)
+ _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+ _headings: tuple[str, ...] = tuple(
+ heading.lower() for heading in next(_csvfile))
+ _errors: tuple[InvalidValue, ...] = tuple()
+
+ _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames)
+ if len(_absent) > 0:
+ _errors = _errors + (InvalidValue(
+ filepath.name,
+ "header row",
+ "-",
+ ", ".join(_absent),
+ (f"The phenotype names ({', '.join(samples)}) do not exist in any "
+ "of the provided phenocovar files.")),)
+
+ def collect_errors(errors_and_linecount, line):
+ _errs, _lc = errors_and_linecount
+ if line[0] not in samples:
+ _errs = _errs + (InvalidValue(
+ filepath.name,
+ line[0],
+ _headings[0],
+ line[0],
+ (f"The sample named '{line[0]}' does not exist in the database. "
+ "You will need to upload that first.")),)
+
+ for field, value in zip(_headings[1:], line[1:]):
+ if value in na_strings:
+ continue
+ _err = error_fn(
+ filepath.name,
+ line[0],
+ field,
+ value)
+ _errs = _errs + ((_err,) if bool(_err) else tuple())
+
+ return _errs, _lc+1
+
+ return {
+ filepath.name: dict(zip(
+ ("errors", "linecount"),
+ reduce(collect_errors, _csvfile, (_errors, 1))))
+ }
+
+
+def phenotype_names(filepath: Path,
+ separator: str,
+ comment_char: str) -> tuple[str, ...]:
+ """Read phenotype names from `phenocovar` file."""
+ return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value]
+ rqtl2.read_csv_file(filepath, separator, comment_char),
+ tuple())[1:]
+
+def fullyqualifiedkey(
+ prefix: str,
+ rest: Optional[str] = None
+) -> Union[Callable[[str], str], str]:
+ """Compute fully qualified Redis key."""
+ if not bool(rest):
+ return lambda _rest: f"{prefix}:{_rest}"
+ return f"{prefix}:{rest}"
+
+def run_qc(# pylint: disable=[too-many-locals]
+ dbconn: mdb.Connection,
+ args: Namespace,
+ logger: Logger
+) -> int:
+ """Run quality control checks on the bundle."""
+ logger.debug("Beginning the quality assuarance checks.")
+ results = check_for_averages_files(
+ **check_for_mandatory_pheno_keys(
+ **validate(args.rqtl2bundle, logger)))
+ errors = results.get("errors", tuple())
+ if len(errors) > 0:
+ logger.error("We found the following errors:\n%s",
+ "\n".join(f" - {error}" for error in errors))
+ return 1
+ # Run QC on actual values
+ # Steps:
+ # - Extract file to specific directory
+ extractiondir, *_bundlefiles = extract_bundle(
+ args.rqtl2bundle, args.workingdir, args.jobid)
+
+ # - For every pheno, phenocovar, phenose, phenonum file, undo
+ # transposition where relevant
+ cdata = rqtl2.control_data(extractiondir)
+ with mproc.Pool(mproc.cpu_count() - 1) as pool:
+ pool.starmap(
+ undo_transpose,
+ ((ftype, cdata, extractiondir)
+ for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
+
+ # - Fetch samples/individuals from database.
+ logger.debug("Fetching samples/individuals from the database.")
+ samples = tuple(#type: ignore[var-annotated]
+ item for item in set(reduce(
+ lambda acc, item: acc + (
+ item["Name"], item["Name2"], item["Symbol"], item["Alias"]),
+ samples_by_species_and_population(
+ dbconn, args.speciesid, args.populationid),
+ tuple()))
+ if bool(item))
+
+ # - Check that `description` and `units` is present in phenocovar for
+ # all phenotypes
+ with mproc.Pool(mproc.cpu_count() - 1) as pool:
+ logger.debug("Check for errors in 'phenocovar' file(s).")
+ _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
+ (extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "phenocovar",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ cdata["sep"],
+ cdata["comment.char"])
+ for _file in cdata.get("phenocovar", []))))
+
+ # - Check all samples in pheno files exist in database
+ # - Check all phenotypes in pheno files exist in phenocovar files
+ # - Check all numeric values in pheno files
+ phenonames = tuple(set(
+ name for names in pool.starmap(phenotype_names, tuple(
+ (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"])
+ for _file in cdata.get("phenocovar", [])))
+ for name in names))
+
+ dec_err_fn = partial(decimal_points_error, message=(
+ "Expected a non-negative number with at least one decimal "
+ "place."))
+
+ logger.debug("Check for errors in 'pheno' file(s).")
+ _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "pheno",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ samples,
+ phenonames,
+ cdata["sep"],
+ cdata["comment.char"],
+ cdata["na.strings"],
+ dec_err_fn
+ ) for _file in cdata.get("pheno", []))))
+
+ # - Check the 3 checks above for phenose and phenonum values too
+ # qc_phenose_files(…)
+ # qc_phenonum_files(…)
+ logger.debug("Check for errors in 'phenose' file(s).")
+ _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "phenose",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ samples,
+ phenonames,
+ cdata["sep"],
+ cdata["comment.char"],
+ cdata["na.strings"],
+ dec_err_fn
+ ) for _file in cdata.get("phenose", []))))
+
+ logger.debug("Check for errors in 'phenonum' file(s).")
+ _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "phenonum",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ samples,
+ phenonames,
+ cdata["sep"],
+ cdata["comment.char"],
+ cdata["na.strings"],
+ partial(integer_error, message=(
+ "Expected a non-negative, non-zero integer value."))
+ ) for _file in cdata.get("phenonum", []))))
+
+ # - Delete all extracted files
+ shutil.rmtree(extractiondir)
+ raise NotImplementedError("WIP!")
+
+
+if __name__ == "__main__":
+ def cli_args():
+ """Process command-line arguments for `install_phenos`"""
+ parser = add_bundle_argument(add_global_data_arguments(init_cli_parser(
+ program="PhenotypesQC",
+ description=(
+ "Perform Quality Control checks on a phenotypes bundle file"))))
+ parser.add_argument(
+ "--workingdir",
+ default=f"{tempfile.gettempdir()}/phenotypes_qc",
+ help=("The directory where this script will put its intermediate "
+ "files."),
+ type=Path)
+ return parser.parse_args()
+
+ main = build_main(cli_args(), run_qc, __MODULE__)
+ sys.exit(main())