diff options
-rw-r--r-- | qc_app/upload/rqtl2.py | 3 | ||||
-rw-r--r-- | scripts/qc_on_rqtl2_bundle.py | 75 |
2 files changed, 59 insertions, 19 deletions
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py index 4ff7ba3..31d5a9d 100644 --- a/qc_app/upload/rqtl2.py +++ b/qc_app/upload/rqtl2.py @@ -207,7 +207,8 @@ def trigger_rqtl2_bundle_qc( str(jobid), [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle", app.config["SQL_URI"], app.config["REDIS_URL"], - jobs.jobsnamespace(), str(jobid), "--redisexpiry", + jobs.jobsnamespace(), str(jobid), str(species_id), + str(population_id), "--redisexpiry", str(redis_ttl_seconds)], "rqtl2-bundle-qc-job", redis_ttl_seconds, diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py index deef8fe..40809b7 100644 --- a/scripts/qc_on_rqtl2_bundle.py +++ b/scripts/qc_on_rqtl2_bundle.py @@ -10,12 +10,14 @@ import multiprocessing as mproc from logging import Logger, getLogger, StreamHandler from typing import Union, Sequence, Callable, Iterator +import MySQLdb as mdb from redis import Redis from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error from qc_app import jobs +from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis from r_qtl import errors as rqe @@ -23,9 +25,9 @@ from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from r_qtl import fileerrors as rqfe -from scripts.cli_parser import init_cli_parser from scripts.process_rqtl2_bundle import parse_job from scripts.redis_logger import setup_redis_logger +from scripts.cli_parser import init_cli_parser, add_global_data_arguments def dict2tuple(dct: dict) -> tuple: """Utility to convert items in dicts to pairs of tuples.""" @@ -109,7 +111,7 @@ def retrieve_errors_with_progress(rconn: Redis,#pylint: disable=[too-many-locals f"The file '{fname}' does not exist in the bundle despite it being " f"listed under '{filetype}' in the control file.")) -def qc_geno_errors(rconn, fqjobid, zfile, logger) -> bool: +def qc_geno_errors(rconn, fqjobid, _dburi, _speciesid, zfile, logger) -> bool: """Check for errors in `geno` file(s).""" cdata = rqtl2.control_data(zfile) if "geno" in cdata: @@ -129,15 +131,32 @@ def qc_geno_errors(rconn, fqjobid, zfile, logger) -> bool: return False -def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[ - Union[InvalidValue, rqfe.MissingFile], ...]: +def fetch_db_geno_samples(conn: mdb.Connection, speciesid: int) -> tuple[str, ...]: + """Fetch samples/cases/individuals from the database.""" + samples = set() + with conn.cursor() as cursor: + cursor.execute("SELECT Name, Name2 from Strain WHERE SpeciesId=%s", + (speciesid,)) + rows = cursor.fetchall() or tuple() + for row in rows: + samples.update(tuple(row)) + + return tuple(item.strip() for item in samples if bool(item)) + + +def check_pheno_samples( + conn: mdb.Connection, + speciesid: int, + zipfilepath: Union[str, Path], + logger: Logger +) -> tuple[Union[InvalidValue, rqfe.MissingFile], ...]: """Check that samples in 'pheno' file exist in geno file.""" cdata = rqtl2.read_control_file(zipfilepath) genosamples = tuple( sample for perfilesamples in ( rqtl2.load_samples(zipfilepath, member, cdata["geno_transposed"]) for member in cdata["geno"]) - for sample in perfilesamples) + for sample in perfilesamples) + fetch_db_geno_samples(conn, speciesid) def __check_file__(member) -> tuple[InvalidValue, ...]: logger.info("Checking samples/cases in member file '%s' …", member) @@ -149,7 +168,9 @@ def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[ errors = errors + (InvalidValue( member, "-", "-", sample, f"The individual/case/sample '{sample}' in file " - f"{member} does not exist in any of the 'geno' files."),) + f"{member} does not exist in either, any of the 'geno' " + "files provided in the bundle or the GeneNetwork database." + ),) logger.info("Found %s missing samples in member file '%s'.", len(errors), @@ -170,15 +191,20 @@ def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[ return allerrors -def qc_pheno_errors(rconn, fqjobid, zfile, logger) -> bool: +def qc_pheno_errors(rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool: """Check for errors in `pheno` file(s).""" cdata = rqtl2.control_data(zfile) if "pheno" in cdata: logger.info("Checking for errors in the 'pheno' file…") - perrs = check_pheno_samples(zfile.filename, logger) + tuple( - retrieve_errors_with_progress( - rconn,fqjobid, zfile, "pheno", - (partial(decimal_points_error, filename="pheno", mini=3),))) + perrs = tuple() + with database_connection(dburi) as dbconn: + perrs = check_pheno_samples( + dbconn, speciesid, zfile.filename, logger) + tuple( + retrieve_errors_with_progress( + rconn,fqjobid, zfile, "pheno", + (partial(decimal_points_error, + filename="pheno", + mini=3),))) add_to_errors(rconn, fqjobid, "errors-generic", tuple( err for err in perrs if isinstance(err, rqfe.MissingFile))) add_to_errors(rconn, fqjobid, "errors-pheno", tuple( @@ -190,7 +216,7 @@ def qc_pheno_errors(rconn, fqjobid, zfile, logger) -> bool: return False -def qc_phenose_errors(rconn, fqjobid, zfile, logger) -> bool: +def qc_phenose_errors(rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool: """Check for errors in `phenose` file(s).""" cdata = rqtl2.control_data(zfile) if "phenose" in cdata: @@ -209,7 +235,14 @@ def qc_phenose_errors(rconn, fqjobid, zfile, logger) -> bool: return False -def qc_phenocovar_errors(_rconn, _fqjobid, _zfile, _logger) -> bool: +def qc_phenocovar_errors( + _rconn, + _fqjobid, + _dburi, + _speciesid, + _zfile, + _logger +) -> bool: """Check for errors in `phenocovar` file(s).""" return False @@ -225,12 +258,18 @@ def run_qc(rconn: Redis, if qc_missing_files(rconn, fqjobid, zfile, logger): return 1 - def with_zipfile(rconn, fqjobid, filename, logger, func): + def with_zipfile(rconn, fqjobid, dbconn, speciesid, filename, logger, func): with ZipFile(filename, "r") as zfile: - return func(rconn, fqjobid, zfile, logger) + return func(rconn, fqjobid, dbconn, speciesid, zfile, logger) def buildargs(func): - return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func) + return (rconn, + fqjobid, + args.databaseuri, + args.speciesid, + jobmeta["rqtl2-bundle-file"], + logger, + func) processes = [ mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)), mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)), @@ -263,8 +302,8 @@ def run_qc(rconn: Redis, if __name__ == "__main__": def main(): """Enter R/qtl2 bundle QC runner.""" - args = init_cli_parser( - "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args() + args = add_global_data_arguments(init_cli_parser( + "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() check_redis(args.redisuri) check_db(args.databaseuri) |