aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/process_rqtl2_bundle.py4
-rw-r--r--scripts/rqtl2/entry.py60
-rw-r--r--scripts/rqtl2/install_genotypes.py5
-rw-r--r--scripts/rqtl2/install_phenos.py5
-rw-r--r--scripts/rqtl2/phenotypes_qc.py112
-rw-r--r--uploader/jobs.py34
-rw-r--r--uploader/phenotypes/views.py14
-rw-r--r--uploader/templates/cli-output.html4
-rw-r--r--uploader/templates/phenotypes/add-phenotypes.html2
-rw-r--r--uploader/templates/phenotypes/job-status.html71
10 files changed, 238 insertions, 73 deletions
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 4efc3e0..8b7a0fb 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -94,7 +94,9 @@ def process_bundle(dbconn: mdb.Connection,
if has_geno_file(thejob):
logger.info("Processing geno files.")
genoexit = install_genotypes(
+ rconn,
dbconn,
+ f"{rprefix}:{jobid}",
argparse.Namespace(
speciesid=meta["speciesid"],
populationid=meta["populationid"],
@@ -110,7 +112,9 @@ def process_bundle(dbconn: mdb.Connection,
if has_pheno_file(thejob):
phenoexit = install_pheno_files(
+ rconn,
dbconn,
+ f"{rprefix}:{jobid}",
argparse.Namespace(
speciesid=meta["speciesid"],
platformid=meta["platformid"],
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index 2a18aa3..327ed2c 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -16,43 +16,47 @@ from scripts.redis_logger import setup_redis_logger
def build_main(
args: Namespace,
- run_fn: Callable[[Connection, Namespace, logging.Logger], int],
+ run_fn: Callable[
+ [Redis, Connection, str, Namespace, logging.Logger],
+ int
+ ],
loggername: str
) -> Callable[[],int]:
"""Build a function to be used as an entry-point for scripts."""
def main():
- logging.basicConfig(
- format=(
- "%(asctime)s - %(levelname)s %(name)s: "
- "(%(pathname)s: %(lineno)d) %(message)s"),
- level=args.loglevel)
- logger = logging.getLogger(loggername)
- check_db(args.databaseuri)
- check_redis(args.redisuri)
- if not args.rqtl2bundle.exists():
- logger.error("File not found: '%s'.", args.rqtl2bundle)
- return 2
+ try:
+ logging.basicConfig(
+ format=(
+ "%(asctime)s - %(levelname)s %(name)s: "
+ "(%(pathname)s: %(lineno)d) %(message)s"),
+ level=args.loglevel)
+ logger = logging.getLogger(loggername)
+ with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
+ database_connection(args.databaseuri) as dbconn):
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ rconn.hset(fqjobid, "status", "started")
+ logger.addHandler(setup_redis_logger(
+ rconn,
+ fqjobid,
+ f"{fqjobid}:log-messages",
+ args.redisexpiry))
+ logger.addHandler(StreamHandler(stream=sys.stdout))
- with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
- database_connection(args.databaseuri) as dbconn):
- fqjobid = jobs.job_key(args.redisprefix, args.jobid)
- rconn.hset(fqjobid, "status", "started")
- logger.addHandler(setup_redis_logger(
- rconn,
- fqjobid,
- f"{fqjobid}:log-messages",
- args.redisexpiry))
- logger.addHandler(StreamHandler(stream=sys.stdout))
- try:
- returncode = run_fn(dbconn, args, logger)
+ check_db(args.databaseuri)
+ check_redis(args.redisuri)
+ if not args.rqtl2bundle.exists():
+ logger.error("File not found: '%s'.", args.rqtl2bundle)
+ return 2
+
+ returncode = run_fn(rconn, dbconn, fqjobid, args, logger)
if returncode == 0:
rconn.hset(fqjobid, "status", "completed:success")
return returncode
rconn.hset(fqjobid, "status", "completed:error")
return returncode
- except Exception as _exc:
- logger.error("The process failed!", exc_info=True)
- rconn.hset(fqjobid, "status", "completed:error")
- return 4
+ except Exception as _exc:# pylint: disable=[broad-except]
+ logger.error("The process failed!", exc_info=True)
+ rconn.hset(fqjobid, "status", "completed:error")
+ return 4
return main
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index 20a19da..8762655 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -7,6 +7,7 @@ from functools import reduce
from typing import Iterator, Optional
from logging import Logger, getLogger
+from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -185,8 +186,10 @@ def cross_reference_genotypes(
cursor.executemany(insertquery, insertparams)
return cursor.rowcount
-def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_genotypes(#pylint: disable=[too-many-locals]
+ rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
args: argparse.Namespace,
logger: Logger = getLogger(__name__)
) -> int:
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index a6e9fb2..9059cd6 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -6,6 +6,7 @@ from zipfile import ZipFile
from functools import reduce
from logging import Logger, getLogger
+from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -95,8 +96,10 @@ def cross_reference_probeset_data(dbconn: mdb.Connection,
} for row in dataids))
return cursor.rowcount
-def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_pheno_files(#pylint: disable=[too-many-locals]
+ rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
args: argparse.Namespace,
logger: Logger = getLogger()) -> int:
"""Load data in `pheno` files and other related files into the database."""
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
index 4f55e40..ba28ed0 100644
--- a/scripts/rqtl2/phenotypes_qc.py
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -1,6 +1,7 @@
"""Run quality control on phenotypes-specific files in the bundle."""
import sys
import uuid
+import json
import shutil
import logging
import tempfile
@@ -152,6 +153,17 @@ def redis_logger(
rconn.close()
+def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue:
+ """Persist the error in redis."""
+ rconn.rpush(fqkey, json.dumps(error._asdict()))
+ return error
+
+
+def file_fqkey(prefix: str, section: str, filepath: Path) -> str:
+ """Build a files fully-qualified key in a consistent manner"""
+ return f"{prefix}:{section}:{filepath.name}"
+
+
def qc_phenocovar_file(
filepath: Path,
redisuri,
@@ -159,52 +171,64 @@ def qc_phenocovar_file(
separator: str,
comment_char: str):
"""Check that `phenocovar` files are structured correctly."""
- with redis_logger(
+ with (redis_logger(
redisuri,
f"{__MODULE__}.qc_phenocovar_file",
filepath.name,
- fqkey) as logger:
+ f"{fqkey}:logs") as logger,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
logger.info("Running QC on file: %s", filepath.name)
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
_headings = tuple(heading.lower() for heading in next(_csvfile))
_errors: tuple[InvalidValue, ...] = tuple()
+ save_error = partial(
+ push_error, rconn, file_fqkey(fqkey, "errors", filepath))
for heading in ("description", "units"):
if heading not in _headings:
- _errors = (InvalidValue(
+ _errors = (save_error(InvalidValue(
filepath.name,
"header row",
"-",
"-",
(f"File {filepath.name} is missing the {heading} heading "
- "in the header line.")),)
+ "in the header line."))),)
def collect_errors(errors_and_linecount, line):
_errs, _lc = errors_and_linecount
logger.info("Testing record '%s'", line[0])
if len(line) != len(_headings):
- _errs = _errs + (InvalidValue(
+ _errs = _errs + (save_error(InvalidValue(
filepath.name,
line[0],
"-",
"-",
(f"Record {_lc} in file {filepath.name} has a different "
- "number of columns than the number of headings")),)
+ "number of columns than the number of headings"))),)
_line = dict(zip(_headings, line))
if not bool(_line["description"]):
_errs = _errs + (
- InvalidValue(filepath.name,
- _line[_headings[0]],
- "description",
- _line["description"],
- "The description is not provided!"),)
-
+ save_error(InvalidValue(filepath.name,
+ _line[_headings[0]],
+ "description",
+ _line["description"],
+ "The description is not provided!")),)
+
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "checking",
+ "linecount": _lc+1,
+ "total-errors": len(_errs)
+ })
return _errs, _lc+1
- return {
- filepath.name: dict(zip(
- ("errors", "linecount"),
- reduce(collect_errors, _csvfile, (_errors, 1))))
- }
+ _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "completed",
+ "linecount": _linecount,
+ "total-errors": len(_errors)
+ })
+ return {filepath.name: {"errors": _errors, "linecount": _linecount}}
def merge_dicts(*dicts):
@@ -243,7 +267,7 @@ def integer_error(
return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
-def qc_pheno_file(# pylint: disable=[too-many-arguments]
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments]
filepath: Path,
redisuri: str,
fqkey: str,
@@ -255,12 +279,15 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
error_fn: Callable = decimal_points_error
):
"""Run QC/QA on a `pheno` file."""
- with redis_logger(
+ with (redis_logger(
redisuri,
f"{__MODULE__}.qc_pheno_file",
filepath.name,
- fqkey) as logger:
+ f"{fqkey}:logs") as logger,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
logger.info("Running QC on file: %s", filepath.name)
+ save_error = partial(
+ push_error, rconn, file_fqkey(fqkey, "errors", filepath))
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
_headings: tuple[str, ...] = tuple(
heading.lower() for heading in next(_csvfile))
@@ -268,24 +295,25 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
_absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames)
if len(_absent) > 0:
- _errors = _errors + (InvalidValue(
+ _errors = _errors + (save_error(InvalidValue(
filepath.name,
"header row",
"-",
", ".join(_absent),
- (f"The phenotype names ({', '.join(samples)}) do not exist in any "
- "of the provided phenocovar files.")),)
+ ("The following phenotype names do not exist in any of the "
+ f"provided phenocovar files: ({', '.join(_absent)})"))),)
def collect_errors(errors_and_linecount, line):
_errs, _lc = errors_and_linecount
+ logger.debug("Checking row %s", line[0])
if line[0] not in samples:
- _errs = _errs + (InvalidValue(
+ _errs = _errs + (save_error(InvalidValue(
filepath.name,
line[0],
_headings[0],
line[0],
(f"The sample named '{line[0]}' does not exist in the database. "
- "You will need to upload that first.")),)
+ "You will need to upload that first."))),)
for field, value in zip(_headings[1:], line[1:]):
if value in na_strings:
@@ -295,15 +323,24 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
line[0],
field,
value)
- _errs = _errs + ((_err,) if bool(_err) else tuple())
-
+ _errs = _errs + ((save_error(_err),) if bool(_err) else tuple())
+
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "checking",
+ "linecount": _lc+1,
+ "total-errors": len(_errs)
+ })
return _errs, _lc+1
- return {
- filepath.name: dict(zip(
- ("errors", "linecount"),
- reduce(collect_errors, _csvfile, (_errors, 1))))
- }
+ _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "completed",
+ "linecount": _linecount,
+ "total-errors": len(_errors)
+ })
+ return {filepath.name: {"errors": _errors, "linecount": _linecount}}
def phenotype_names(filepath: Path,
@@ -324,7 +361,9 @@ def fullyqualifiedkey(
return f"{prefix}:{rest}"
def run_qc(# pylint: disable=[too-many-locals]
+ rconn: Redis,
dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,
args: Namespace,
logger: Logger
) -> int:
@@ -366,15 +405,16 @@ def run_qc(# pylint: disable=[too-many-locals]
# - Check that `description` and `units` is present in phenocovar for
# all phenotypes
+ rconn.hset(fullyqualifiedjobid,
+ "fully-qualified-keys:phenocovar",
+ json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
+ for _file in cdata.get("phenocovar", []))))
with mproc.Pool(mproc.cpu_count() - 1) as pool:
logger.debug("Check for errors in 'phenocovar' file(s).")
_phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
(extractiondir.joinpath(_file),
args.redisuri,
- chain(
- "phenocovar",
- fullyqualifiedkey(args.jobid),
- fullyqualifiedkey(args.redisprefix)),
+ f"{fullyqualifiedjobid}:phenocovar",
cdata["sep"],
cdata["comment.char"])
for _file in cdata.get("phenocovar", []))))
diff --git a/uploader/jobs.py b/uploader/jobs.py
index 4a3fc80..e86ee05 100644
--- a/uploader/jobs.py
+++ b/uploader/jobs.py
@@ -1,6 +1,8 @@
"""Handle jobs"""
import os
import sys
+import uuid
+import json
import shlex
import subprocess
from uuid import UUID, uuid4
@@ -10,6 +12,8 @@ from typing import Union, Optional
from redis import Redis
from flask import current_app as app
+from functional_tools import take
+
JOBS_PREFIX = "jobs"
class JobNotFound(Exception):
@@ -128,3 +132,33 @@ def update_stdout_stderr(rconn: Redis,
contents = thejob.get(stream, '')
new_contents = contents + bytes_read.decode("utf-8")
rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)
+
+
+def job_errors(
+ rconn: Redis,
+ prefix: str,
+ job_id: Union[str, uuid.UUID],
+ count: int = 100
+) -> list:
+ """Fetch job errors"""
+ return take(
+ (
+ json.loads(error)
+ for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*")
+ for error in rconn.lrange(key, 0, -1)),
+ count)
+
+
+def job_files_metadata(
+ rconn: Redis,
+ prefix: str,
+ job_id: Union[str, uuid.UUID]
+) -> dict:
+ """Get the metadata for specific job file."""
+ return {
+ key.split(":")[-1]: {
+ **rconn.hgetall(key),
+ "filetype": key.split(":")[-3]
+ }
+ for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*")
+ }
diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py
index b8c0e93..a664ba9 100644
--- a/uploader/phenotypes/views.py
+++ b/uploader/phenotypes/views.py
@@ -395,10 +395,14 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p
species_redirect_uri="species.populations.phenotypes.index",
population_redirect_uri="species.populations.phenotypes.select_population",
redirect_uri="species.populations.phenotypes.list_datasets")
-def job_status(species: dict, population: dict, dataset: dict, job_id: uuid, **kwargs):
+def job_status(
+ species: dict,
+ population: dict,
+ dataset: dict,
+ job_id: uuid.UUID,
+ **kwargs
+):# pylint: disable=[unused-argument]
"""Retrieve current status of a particular phenotype QC job."""
- from uploader.debug import __pk__
-
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
try:
job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id))
@@ -410,4 +414,8 @@ def job_status(species: dict, population: dict, dataset: dict, job_id: uuid, **k
dataset=dataset,
job_id=job_id,
job=job,
+ errors=jobs.job_errors(
+ rconn, jobs.jobsnamespace(), job['jobid']),
+ metadata=jobs.job_files_metadata(
+ rconn, jobs.jobsnamespace(), job['jobid']),
activelink="add-phenotypes")
diff --git a/uploader/templates/cli-output.html b/uploader/templates/cli-output.html
index 33fb73b..64b1a9a 100644
--- a/uploader/templates/cli-output.html
+++ b/uploader/templates/cli-output.html
@@ -1,7 +1,7 @@
{%macro cli_output(job, stream)%}
-<h4>{{stream | upper}} Output</h4>
-<div class="cli-output">
+<h4 class="subheading">{{stream | upper}} Output</h4>
+<div class="cli-output" style="max-height: 10em; overflow: auto;">
<pre>{{job.get(stream, "")}}</pre>
</div>
diff --git a/uploader/templates/phenotypes/add-phenotypes.html b/uploader/templates/phenotypes/add-phenotypes.html
index 196bc69..9e368e1 100644
--- a/uploader/templates/phenotypes/add-phenotypes.html
+++ b/uploader/templates/phenotypes/add-phenotypes.html
@@ -40,7 +40,7 @@
<p>See the <a href="#section-file-formats">File Formats</a> section below
to get an understanding of what is expected of the bundle files you
upload.</p>
- <p><strong>This will not update any existing phenotypes!</strong></p>
+ <p><strong class="text-warning">This will not update any existing phenotypes!</strong></p>
</div>
<div class="form-group">
diff --git a/uploader/templates/phenotypes/job-status.html b/uploader/templates/phenotypes/job-status.html
index d531a71..30316b5 100644
--- a/uploader/templates/phenotypes/job-status.html
+++ b/uploader/templates/phenotypes/job-status.html
@@ -30,8 +30,32 @@
{%block contents%}
{%if job%}
+<h4 class="subheading">Progress</h4>
<div class="row">
- <p><strong>Status:</strong> {{job.status}}</p>
+ <p><strong>Process Status:</strong> {{job.status}}</p>
+ {%if metadata%}
+ <table class="table">
+ <thead>
+ <tr>
+ <th>File</th>
+ <th>Status</th>
+ <th>Lines Processed</th>
+ <th>Total Errors</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ {%for file,meta in metadata.items()%}
+ <tr>
+ <td>{{file}}</td>
+ <td>{{meta.status}}</td>
+ <td>{{meta.linecount}}</td>
+ <td>{{meta["total-errors"]}}</td>
+ </tr>
+ {%endfor%}
+ </tbody>
+ </table>
+ {%endif%}
{%if job.status in ("completed:success", "success")%}
<p><a href="#"
class="not-implemented btn btn-primary"
@@ -40,6 +64,50 @@
{%endif%}
</div>
+<h4 class="subheading">Errors</h4>
+<div class="row" style="max-height: 20em; overflow: auto;">
+ {%if errors | length == 0 %}
+ <p class="text-info">
+ <span class="glyphicon glyphicon-info-sign"></span>
+ No errors found so far
+ </p>
+ {%else%}
+ <table class="table">
+ <thead>
+ <tr>
+ <th>File</th>
+ <th>Row</th>
+ <th>Column</th>
+ <th>Value</th>
+ <th>Message</th>
+ </thead>
+
+ <tbody style="font-size: 0.9em;">
+ {%for error in errors%}
+ <tr>
+ <td>{{error.filename}}</td>
+ <td>{{error.rowtitle}}</td>
+ <td>{{error.coltitle}}</td>
+ <td>{%if error.cellvalue | length > 25%}
+ {{error.cellvalue[0:24]}}&hellip;
+ {%else%}
+ {{error.cellvalue}}
+ {%endif%}
+ </td>
+ <td>
+ {%if error.message | length > 250 %}
+ {{error.message[0:249]}}&hellip;
+ {%else%}
+ {{error.message}}
+ {%endif%}
+ </td>
+ </tr>
+ {%endfor%}
+ </tbody>
+ </table>
+ {%endif%}
+</div>
+
<div class="row">
{{cli_output(job, "stdout")}}
</div>
@@ -47,6 +115,7 @@
<div class="row">
{{cli_output(job, "stderr")}}
</div>
+
{%else%}
<div class="row">
<h3 class="text-danger">No Such Job</h3>