diff options
-rw-r--r-- | scripts/process_rqtl2_bundle.py | 4 | ||||
-rw-r--r-- | scripts/rqtl2/entry.py | 60 | ||||
-rw-r--r-- | scripts/rqtl2/install_genotypes.py | 5 | ||||
-rw-r--r-- | scripts/rqtl2/install_phenos.py | 5 | ||||
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 112 | ||||
-rw-r--r-- | uploader/jobs.py | 34 | ||||
-rw-r--r-- | uploader/phenotypes/views.py | 14 | ||||
-rw-r--r-- | uploader/templates/cli-output.html | 4 | ||||
-rw-r--r-- | uploader/templates/phenotypes/add-phenotypes.html | 2 | ||||
-rw-r--r-- | uploader/templates/phenotypes/job-status.html | 71 |
10 files changed, 238 insertions, 73 deletions
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 4efc3e0..8b7a0fb 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -94,7 +94,9 @@ def process_bundle(dbconn: mdb.Connection, if has_geno_file(thejob): logger.info("Processing geno files.") genoexit = install_genotypes( + rconn, dbconn, + f"{rprefix}:{jobid}", argparse.Namespace( speciesid=meta["speciesid"], populationid=meta["populationid"], @@ -110,7 +112,9 @@ def process_bundle(dbconn: mdb.Connection, if has_pheno_file(thejob): phenoexit = install_pheno_files( + rconn, dbconn, + f"{rprefix}:{jobid}", argparse.Namespace( speciesid=meta["speciesid"], platformid=meta["platformid"], diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index 2a18aa3..327ed2c 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -16,43 +16,47 @@ from scripts.redis_logger import setup_redis_logger def build_main( args: Namespace, - run_fn: Callable[[Connection, Namespace, logging.Logger], int], + run_fn: Callable[ + [Redis, Connection, str, Namespace, logging.Logger], + int + ], loggername: str ) -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - logging.basicConfig( - format=( - "%(asctime)s - %(levelname)s %(name)s: " - "(%(pathname)s: %(lineno)d) %(message)s"), - level=args.loglevel) - logger = logging.getLogger(loggername) - check_db(args.databaseuri) - check_redis(args.redisuri) - if not args.rqtl2bundle.exists(): - logger.error("File not found: '%s'.", args.rqtl2bundle) - return 2 + try: + logging.basicConfig( + format=( + "%(asctime)s - %(levelname)s %(name)s: " + "(%(pathname)s: %(lineno)d) %(message)s"), + level=args.loglevel) + logger = logging.getLogger(loggername) + with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, + database_connection(args.databaseuri) as dbconn): + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + rconn.hset(fqjobid, "status", "started") + logger.addHandler(setup_redis_logger( + rconn, + fqjobid, + f"{fqjobid}:log-messages", + args.redisexpiry)) + logger.addHandler(StreamHandler(stream=sys.stdout)) - with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, - database_connection(args.databaseuri) as dbconn): - fqjobid = jobs.job_key(args.redisprefix, args.jobid) - rconn.hset(fqjobid, "status", "started") - logger.addHandler(setup_redis_logger( - rconn, - fqjobid, - f"{fqjobid}:log-messages", - args.redisexpiry)) - logger.addHandler(StreamHandler(stream=sys.stdout)) - try: - returncode = run_fn(dbconn, args, logger) + check_db(args.databaseuri) + check_redis(args.redisuri) + if not args.rqtl2bundle.exists(): + logger.error("File not found: '%s'.", args.rqtl2bundle) + return 2 + + returncode = run_fn(rconn, dbconn, fqjobid, args, logger) if returncode == 0: rconn.hset(fqjobid, "status", "completed:success") return returncode rconn.hset(fqjobid, "status", "completed:error") return returncode - except Exception as _exc: - logger.error("The process failed!", exc_info=True) - rconn.hset(fqjobid, "status", "completed:error") - return 4 + except Exception as _exc:# pylint: disable=[broad-except] + logger.error("The process failed!", exc_info=True) + rconn.hset(fqjobid, "status", "completed:error") + return 4 return main diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py index 20a19da..8762655 100644 --- a/scripts/rqtl2/install_genotypes.py +++ b/scripts/rqtl2/install_genotypes.py @@ -7,6 +7,7 @@ from functools import reduce from typing import Iterator, Optional from logging import Logger, getLogger +from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -185,8 +186,10 @@ def cross_reference_genotypes( cursor.executemany(insertquery, insertparams) return cursor.rowcount -def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals] +def install_genotypes(#pylint: disable=[too-many-locals] + rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, + fullyqualifiedjobid: str,#pylint: disable=[unused-argument] args: argparse.Namespace, logger: Logger = getLogger(__name__) ) -> int: diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index a6e9fb2..9059cd6 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -6,6 +6,7 @@ from zipfile import ZipFile from functools import reduce from logging import Logger, getLogger +from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -95,8 +96,10 @@ def cross_reference_probeset_data(dbconn: mdb.Connection, } for row in dataids)) return cursor.rowcount -def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals] +def install_pheno_files(#pylint: disable=[too-many-locals] + rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, + fullyqualifiedjobid: str,#pylint: disable=[unused-argument] args: argparse.Namespace, logger: Logger = getLogger()) -> int: """Load data in `pheno` files and other related files into the database.""" diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py index 4f55e40..ba28ed0 100644 --- a/scripts/rqtl2/phenotypes_qc.py +++ b/scripts/rqtl2/phenotypes_qc.py @@ -1,6 +1,7 @@ """Run quality control on phenotypes-specific files in the bundle.""" import sys import uuid +import json import shutil import logging import tempfile @@ -152,6 +153,17 @@ def redis_logger( rconn.close() +def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue: + """Persist the error in redis.""" + rconn.rpush(fqkey, json.dumps(error._asdict())) + return error + + +def file_fqkey(prefix: str, section: str, filepath: Path) -> str: + """Build a files fully-qualified key in a consistent manner""" + return f"{prefix}:{section}:{filepath.name}" + + def qc_phenocovar_file( filepath: Path, redisuri, @@ -159,52 +171,64 @@ def qc_phenocovar_file( separator: str, comment_char: str): """Check that `phenocovar` files are structured correctly.""" - with redis_logger( + with (redis_logger( redisuri, f"{__MODULE__}.qc_phenocovar_file", filepath.name, - fqkey) as logger: + f"{fqkey}:logs") as logger, + Redis.from_url(redisuri, decode_responses=True) as rconn): logger.info("Running QC on file: %s", filepath.name) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings = tuple(heading.lower() for heading in next(_csvfile)) _errors: tuple[InvalidValue, ...] = tuple() + save_error = partial( + push_error, rconn, file_fqkey(fqkey, "errors", filepath)) for heading in ("description", "units"): if heading not in _headings: - _errors = (InvalidValue( + _errors = (save_error(InvalidValue( filepath.name, "header row", "-", "-", (f"File {filepath.name} is missing the {heading} heading " - "in the header line.")),) + "in the header line."))),) def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount logger.info("Testing record '%s'", line[0]) if len(line) != len(_headings): - _errs = _errs + (InvalidValue( + _errs = _errs + (save_error(InvalidValue( filepath.name, line[0], "-", "-", (f"Record {_lc} in file {filepath.name} has a different " - "number of columns than the number of headings")),) + "number of columns than the number of headings"))),) _line = dict(zip(_headings, line)) if not bool(_line["description"]): _errs = _errs + ( - InvalidValue(filepath.name, - _line[_headings[0]], - "description", - _line["description"], - "The description is not provided!"),) - + save_error(InvalidValue(filepath.name, + _line[_headings[0]], + "description", + _line["description"], + "The description is not provided!")),) + + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "checking", + "linecount": _lc+1, + "total-errors": len(_errs) + }) return _errs, _lc+1 - return { - filepath.name: dict(zip( - ("errors", "linecount"), - reduce(collect_errors, _csvfile, (_errors, 1)))) - } + _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1)) + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "completed", + "linecount": _linecount, + "total-errors": len(_errors) + }) + return {filepath.name: {"errors": _errors, "linecount": _linecount}} def merge_dicts(*dicts): @@ -243,7 +267,7 @@ def integer_error( return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) -def qc_pheno_file(# pylint: disable=[too-many-arguments] +def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] filepath: Path, redisuri: str, fqkey: str, @@ -255,12 +279,15 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] error_fn: Callable = decimal_points_error ): """Run QC/QA on a `pheno` file.""" - with redis_logger( + with (redis_logger( redisuri, f"{__MODULE__}.qc_pheno_file", filepath.name, - fqkey) as logger: + f"{fqkey}:logs") as logger, + Redis.from_url(redisuri, decode_responses=True) as rconn): logger.info("Running QC on file: %s", filepath.name) + save_error = partial( + push_error, rconn, file_fqkey(fqkey, "errors", filepath)) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings: tuple[str, ...] = tuple( heading.lower() for heading in next(_csvfile)) @@ -268,24 +295,25 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames) if len(_absent) > 0: - _errors = _errors + (InvalidValue( + _errors = _errors + (save_error(InvalidValue( filepath.name, "header row", "-", ", ".join(_absent), - (f"The phenotype names ({', '.join(samples)}) do not exist in any " - "of the provided phenocovar files.")),) + ("The following phenotype names do not exist in any of the " + f"provided phenocovar files: ({', '.join(_absent)})"))),) def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount + logger.debug("Checking row %s", line[0]) if line[0] not in samples: - _errs = _errs + (InvalidValue( + _errs = _errs + (save_error(InvalidValue( filepath.name, line[0], _headings[0], line[0], (f"The sample named '{line[0]}' does not exist in the database. " - "You will need to upload that first.")),) + "You will need to upload that first."))),) for field, value in zip(_headings[1:], line[1:]): if value in na_strings: @@ -295,15 +323,24 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] line[0], field, value) - _errs = _errs + ((_err,) if bool(_err) else tuple()) - + _errs = _errs + ((save_error(_err),) if bool(_err) else tuple()) + + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "checking", + "linecount": _lc+1, + "total-errors": len(_errs) + }) return _errs, _lc+1 - return { - filepath.name: dict(zip( - ("errors", "linecount"), - reduce(collect_errors, _csvfile, (_errors, 1)))) - } + _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1)) + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "completed", + "linecount": _linecount, + "total-errors": len(_errors) + }) + return {filepath.name: {"errors": _errors, "linecount": _linecount}} def phenotype_names(filepath: Path, @@ -324,7 +361,9 @@ def fullyqualifiedkey( return f"{prefix}:{rest}" def run_qc(# pylint: disable=[too-many-locals] + rconn: Redis, dbconn: mdb.Connection, + fullyqualifiedjobid: str, args: Namespace, logger: Logger ) -> int: @@ -366,15 +405,16 @@ def run_qc(# pylint: disable=[too-many-locals] # - Check that `description` and `units` is present in phenocovar for # all phenotypes + rconn.hset(fullyqualifiedjobid, + "fully-qualified-keys:phenocovar", + json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}" + for _file in cdata.get("phenocovar", [])))) with mproc.Pool(mproc.cpu_count() - 1) as pool: logger.debug("Check for errors in 'phenocovar' file(s).") _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( (extractiondir.joinpath(_file), args.redisuri, - chain( - "phenocovar", - fullyqualifiedkey(args.jobid), - fullyqualifiedkey(args.redisprefix)), + f"{fullyqualifiedjobid}:phenocovar", cdata["sep"], cdata["comment.char"]) for _file in cdata.get("phenocovar", [])))) diff --git a/uploader/jobs.py b/uploader/jobs.py index 4a3fc80..e86ee05 100644 --- a/uploader/jobs.py +++ b/uploader/jobs.py @@ -1,6 +1,8 @@ """Handle jobs""" import os import sys +import uuid +import json import shlex import subprocess from uuid import UUID, uuid4 @@ -10,6 +12,8 @@ from typing import Union, Optional from redis import Redis from flask import current_app as app +from functional_tools import take + JOBS_PREFIX = "jobs" class JobNotFound(Exception): @@ -128,3 +132,33 @@ def update_stdout_stderr(rconn: Redis, contents = thejob.get(stream, '') new_contents = contents + bytes_read.decode("utf-8") rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) + + +def job_errors( + rconn: Redis, + prefix: str, + job_id: Union[str, uuid.UUID], + count: int = 100 +) -> list: + """Fetch job errors""" + return take( + ( + json.loads(error) + for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*") + for error in rconn.lrange(key, 0, -1)), + count) + + +def job_files_metadata( + rconn: Redis, + prefix: str, + job_id: Union[str, uuid.UUID] +) -> dict: + """Get the metadata for specific job file.""" + return { + key.split(":")[-1]: { + **rconn.hgetall(key), + "filetype": key.split(":")[-3] + } + for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*") + } diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py index b8c0e93..a664ba9 100644 --- a/uploader/phenotypes/views.py +++ b/uploader/phenotypes/views.py @@ -395,10 +395,14 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p species_redirect_uri="species.populations.phenotypes.index", population_redirect_uri="species.populations.phenotypes.select_population", redirect_uri="species.populations.phenotypes.list_datasets") -def job_status(species: dict, population: dict, dataset: dict, job_id: uuid, **kwargs): +def job_status( + species: dict, + population: dict, + dataset: dict, + job_id: uuid.UUID, + **kwargs +):# pylint: disable=[unused-argument] """Retrieve current status of a particular phenotype QC job.""" - from uploader.debug import __pk__ - with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: try: job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id)) @@ -410,4 +414,8 @@ def job_status(species: dict, population: dict, dataset: dict, job_id: uuid, **k dataset=dataset, job_id=job_id, job=job, + errors=jobs.job_errors( + rconn, jobs.jobsnamespace(), job['jobid']), + metadata=jobs.job_files_metadata( + rconn, jobs.jobsnamespace(), job['jobid']), activelink="add-phenotypes") diff --git a/uploader/templates/cli-output.html b/uploader/templates/cli-output.html index 33fb73b..64b1a9a 100644 --- a/uploader/templates/cli-output.html +++ b/uploader/templates/cli-output.html @@ -1,7 +1,7 @@ {%macro cli_output(job, stream)%} -<h4>{{stream | upper}} Output</h4> -<div class="cli-output"> +<h4 class="subheading">{{stream | upper}} Output</h4> +<div class="cli-output" style="max-height: 10em; overflow: auto;"> <pre>{{job.get(stream, "")}}</pre> </div> diff --git a/uploader/templates/phenotypes/add-phenotypes.html b/uploader/templates/phenotypes/add-phenotypes.html index 196bc69..9e368e1 100644 --- a/uploader/templates/phenotypes/add-phenotypes.html +++ b/uploader/templates/phenotypes/add-phenotypes.html @@ -40,7 +40,7 @@ <p>See the <a href="#section-file-formats">File Formats</a> section below to get an understanding of what is expected of the bundle files you upload.</p> - <p><strong>This will not update any existing phenotypes!</strong></p> + <p><strong class="text-warning">This will not update any existing phenotypes!</strong></p> </div> <div class="form-group"> diff --git a/uploader/templates/phenotypes/job-status.html b/uploader/templates/phenotypes/job-status.html index d531a71..30316b5 100644 --- a/uploader/templates/phenotypes/job-status.html +++ b/uploader/templates/phenotypes/job-status.html @@ -30,8 +30,32 @@ {%block contents%} {%if job%} +<h4 class="subheading">Progress</h4> <div class="row"> - <p><strong>Status:</strong> {{job.status}}</p> + <p><strong>Process Status:</strong> {{job.status}}</p> + {%if metadata%} + <table class="table"> + <thead> + <tr> + <th>File</th> + <th>Status</th> + <th>Lines Processed</th> + <th>Total Errors</th> + </tr> + </thead> + + <tbody> + {%for file,meta in metadata.items()%} + <tr> + <td>{{file}}</td> + <td>{{meta.status}}</td> + <td>{{meta.linecount}}</td> + <td>{{meta["total-errors"]}}</td> + </tr> + {%endfor%} + </tbody> + </table> + {%endif%} {%if job.status in ("completed:success", "success")%} <p><a href="#" class="not-implemented btn btn-primary" @@ -40,6 +64,50 @@ {%endif%} </div> +<h4 class="subheading">Errors</h4> +<div class="row" style="max-height: 20em; overflow: auto;"> + {%if errors | length == 0 %} + <p class="text-info"> + <span class="glyphicon glyphicon-info-sign"></span> + No errors found so far + </p> + {%else%} + <table class="table"> + <thead> + <tr> + <th>File</th> + <th>Row</th> + <th>Column</th> + <th>Value</th> + <th>Message</th> + </thead> + + <tbody style="font-size: 0.9em;"> + {%for error in errors%} + <tr> + <td>{{error.filename}}</td> + <td>{{error.rowtitle}}</td> + <td>{{error.coltitle}}</td> + <td>{%if error.cellvalue | length > 25%} + {{error.cellvalue[0:24]}}… + {%else%} + {{error.cellvalue}} + {%endif%} + </td> + <td> + {%if error.message | length > 250 %} + {{error.message[0:249]}}… + {%else%} + {{error.message}} + {%endif%} + </td> + </tr> + {%endfor%} + </tbody> + </table> + {%endif%} +</div> + <div class="row"> {{cli_output(job, "stdout")}} </div> @@ -47,6 +115,7 @@ <div class="row"> {{cli_output(job, "stderr")}} </div> + {%else%} <div class="row"> <h3 class="text-danger">No Such Job</h3> |