aboutsummaryrefslogtreecommitdiff
path: root/scripts/rqtl2
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rqtl2')
-rw-r--r--scripts/rqtl2/entry.py60
-rw-r--r--scripts/rqtl2/install_genotypes.py5
-rw-r--r--scripts/rqtl2/install_phenos.py5
-rw-r--r--scripts/rqtl2/phenotypes_qc.py112
4 files changed, 116 insertions, 66 deletions
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index 2a18aa3..327ed2c 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -16,43 +16,47 @@ from scripts.redis_logger import setup_redis_logger
def build_main(
args: Namespace,
- run_fn: Callable[[Connection, Namespace, logging.Logger], int],
+ run_fn: Callable[
+ [Redis, Connection, str, Namespace, logging.Logger],
+ int
+ ],
loggername: str
) -> Callable[[],int]:
"""Build a function to be used as an entry-point for scripts."""
def main():
- logging.basicConfig(
- format=(
- "%(asctime)s - %(levelname)s %(name)s: "
- "(%(pathname)s: %(lineno)d) %(message)s"),
- level=args.loglevel)
- logger = logging.getLogger(loggername)
- check_db(args.databaseuri)
- check_redis(args.redisuri)
- if not args.rqtl2bundle.exists():
- logger.error("File not found: '%s'.", args.rqtl2bundle)
- return 2
+ try:
+ logging.basicConfig(
+ format=(
+ "%(asctime)s - %(levelname)s %(name)s: "
+ "(%(pathname)s: %(lineno)d) %(message)s"),
+ level=args.loglevel)
+ logger = logging.getLogger(loggername)
+ with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
+ database_connection(args.databaseuri) as dbconn):
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ rconn.hset(fqjobid, "status", "started")
+ logger.addHandler(setup_redis_logger(
+ rconn,
+ fqjobid,
+ f"{fqjobid}:log-messages",
+ args.redisexpiry))
+ logger.addHandler(StreamHandler(stream=sys.stdout))
- with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
- database_connection(args.databaseuri) as dbconn):
- fqjobid = jobs.job_key(args.redisprefix, args.jobid)
- rconn.hset(fqjobid, "status", "started")
- logger.addHandler(setup_redis_logger(
- rconn,
- fqjobid,
- f"{fqjobid}:log-messages",
- args.redisexpiry))
- logger.addHandler(StreamHandler(stream=sys.stdout))
- try:
- returncode = run_fn(dbconn, args, logger)
+ check_db(args.databaseuri)
+ check_redis(args.redisuri)
+ if not args.rqtl2bundle.exists():
+ logger.error("File not found: '%s'.", args.rqtl2bundle)
+ return 2
+
+ returncode = run_fn(rconn, dbconn, fqjobid, args, logger)
if returncode == 0:
rconn.hset(fqjobid, "status", "completed:success")
return returncode
rconn.hset(fqjobid, "status", "completed:error")
return returncode
- except Exception as _exc:
- logger.error("The process failed!", exc_info=True)
- rconn.hset(fqjobid, "status", "completed:error")
- return 4
+ except Exception as _exc:# pylint: disable=[broad-except]
+ logger.error("The process failed!", exc_info=True)
+ rconn.hset(fqjobid, "status", "completed:error")
+ return 4
return main
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index 20a19da..8762655 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -7,6 +7,7 @@ from functools import reduce
from typing import Iterator, Optional
from logging import Logger, getLogger
+from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -185,8 +186,10 @@ def cross_reference_genotypes(
cursor.executemany(insertquery, insertparams)
return cursor.rowcount
-def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_genotypes(#pylint: disable=[too-many-locals]
+ rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
args: argparse.Namespace,
logger: Logger = getLogger(__name__)
) -> int:
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index a6e9fb2..9059cd6 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -6,6 +6,7 @@ from zipfile import ZipFile
from functools import reduce
from logging import Logger, getLogger
+from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -95,8 +96,10 @@ def cross_reference_probeset_data(dbconn: mdb.Connection,
} for row in dataids))
return cursor.rowcount
-def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_pheno_files(#pylint: disable=[too-many-locals]
+ rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
args: argparse.Namespace,
logger: Logger = getLogger()) -> int:
"""Load data in `pheno` files and other related files into the database."""
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
index 4f55e40..ba28ed0 100644
--- a/scripts/rqtl2/phenotypes_qc.py
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -1,6 +1,7 @@
"""Run quality control on phenotypes-specific files in the bundle."""
import sys
import uuid
+import json
import shutil
import logging
import tempfile
@@ -152,6 +153,17 @@ def redis_logger(
rconn.close()
+def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue:
+ """Persist the error in redis."""
+ rconn.rpush(fqkey, json.dumps(error._asdict()))
+ return error
+
+
+def file_fqkey(prefix: str, section: str, filepath: Path) -> str:
+ """Build a files fully-qualified key in a consistent manner"""
+ return f"{prefix}:{section}:{filepath.name}"
+
+
def qc_phenocovar_file(
filepath: Path,
redisuri,
@@ -159,52 +171,64 @@ def qc_phenocovar_file(
separator: str,
comment_char: str):
"""Check that `phenocovar` files are structured correctly."""
- with redis_logger(
+ with (redis_logger(
redisuri,
f"{__MODULE__}.qc_phenocovar_file",
filepath.name,
- fqkey) as logger:
+ f"{fqkey}:logs") as logger,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
logger.info("Running QC on file: %s", filepath.name)
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
_headings = tuple(heading.lower() for heading in next(_csvfile))
_errors: tuple[InvalidValue, ...] = tuple()
+ save_error = partial(
+ push_error, rconn, file_fqkey(fqkey, "errors", filepath))
for heading in ("description", "units"):
if heading not in _headings:
- _errors = (InvalidValue(
+ _errors = (save_error(InvalidValue(
filepath.name,
"header row",
"-",
"-",
(f"File {filepath.name} is missing the {heading} heading "
- "in the header line.")),)
+ "in the header line."))),)
def collect_errors(errors_and_linecount, line):
_errs, _lc = errors_and_linecount
logger.info("Testing record '%s'", line[0])
if len(line) != len(_headings):
- _errs = _errs + (InvalidValue(
+ _errs = _errs + (save_error(InvalidValue(
filepath.name,
line[0],
"-",
"-",
(f"Record {_lc} in file {filepath.name} has a different "
- "number of columns than the number of headings")),)
+ "number of columns than the number of headings"))),)
_line = dict(zip(_headings, line))
if not bool(_line["description"]):
_errs = _errs + (
- InvalidValue(filepath.name,
- _line[_headings[0]],
- "description",
- _line["description"],
- "The description is not provided!"),)
-
+ save_error(InvalidValue(filepath.name,
+ _line[_headings[0]],
+ "description",
+ _line["description"],
+ "The description is not provided!")),)
+
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "checking",
+ "linecount": _lc+1,
+ "total-errors": len(_errs)
+ })
return _errs, _lc+1
- return {
- filepath.name: dict(zip(
- ("errors", "linecount"),
- reduce(collect_errors, _csvfile, (_errors, 1))))
- }
+ _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "completed",
+ "linecount": _linecount,
+ "total-errors": len(_errors)
+ })
+ return {filepath.name: {"errors": _errors, "linecount": _linecount}}
def merge_dicts(*dicts):
@@ -243,7 +267,7 @@ def integer_error(
return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
-def qc_pheno_file(# pylint: disable=[too-many-arguments]
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments]
filepath: Path,
redisuri: str,
fqkey: str,
@@ -255,12 +279,15 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
error_fn: Callable = decimal_points_error
):
"""Run QC/QA on a `pheno` file."""
- with redis_logger(
+ with (redis_logger(
redisuri,
f"{__MODULE__}.qc_pheno_file",
filepath.name,
- fqkey) as logger:
+ f"{fqkey}:logs") as logger,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
logger.info("Running QC on file: %s", filepath.name)
+ save_error = partial(
+ push_error, rconn, file_fqkey(fqkey, "errors", filepath))
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
_headings: tuple[str, ...] = tuple(
heading.lower() for heading in next(_csvfile))
@@ -268,24 +295,25 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
_absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames)
if len(_absent) > 0:
- _errors = _errors + (InvalidValue(
+ _errors = _errors + (save_error(InvalidValue(
filepath.name,
"header row",
"-",
", ".join(_absent),
- (f"The phenotype names ({', '.join(samples)}) do not exist in any "
- "of the provided phenocovar files.")),)
+ ("The following phenotype names do not exist in any of the "
+ f"provided phenocovar files: ({', '.join(_absent)})"))),)
def collect_errors(errors_and_linecount, line):
_errs, _lc = errors_and_linecount
+ logger.debug("Checking row %s", line[0])
if line[0] not in samples:
- _errs = _errs + (InvalidValue(
+ _errs = _errs + (save_error(InvalidValue(
filepath.name,
line[0],
_headings[0],
line[0],
(f"The sample named '{line[0]}' does not exist in the database. "
- "You will need to upload that first.")),)
+ "You will need to upload that first."))),)
for field, value in zip(_headings[1:], line[1:]):
if value in na_strings:
@@ -295,15 +323,24 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
line[0],
field,
value)
- _errs = _errs + ((_err,) if bool(_err) else tuple())
-
+ _errs = _errs + ((save_error(_err),) if bool(_err) else tuple())
+
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "checking",
+ "linecount": _lc+1,
+ "total-errors": len(_errs)
+ })
return _errs, _lc+1
- return {
- filepath.name: dict(zip(
- ("errors", "linecount"),
- reduce(collect_errors, _csvfile, (_errors, 1))))
- }
+ _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "completed",
+ "linecount": _linecount,
+ "total-errors": len(_errors)
+ })
+ return {filepath.name: {"errors": _errors, "linecount": _linecount}}
def phenotype_names(filepath: Path,
@@ -324,7 +361,9 @@ def fullyqualifiedkey(
return f"{prefix}:{rest}"
def run_qc(# pylint: disable=[too-many-locals]
+ rconn: Redis,
dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,
args: Namespace,
logger: Logger
) -> int:
@@ -366,15 +405,16 @@ def run_qc(# pylint: disable=[too-many-locals]
# - Check that `description` and `units` is present in phenocovar for
# all phenotypes
+ rconn.hset(fullyqualifiedjobid,
+ "fully-qualified-keys:phenocovar",
+ json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
+ for _file in cdata.get("phenocovar", []))))
with mproc.Pool(mproc.cpu_count() - 1) as pool:
logger.debug("Check for errors in 'phenocovar' file(s).")
_phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
(extractiondir.joinpath(_file),
args.redisuri,
- chain(
- "phenocovar",
- fullyqualifiedkey(args.jobid),
- fullyqualifiedkey(args.redisprefix)),
+ f"{fullyqualifiedjobid}:phenocovar",
cdata["sep"],
cdata["comment.char"])
for _file in cdata.get("phenocovar", []))))