aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qc_app/dbinsert.py2
-rw-r--r--qc_app/jobs.py19
-rw-r--r--qc_app/parse.py70
-rw-r--r--qc_app/samples.py2
-rw-r--r--scripts/worker.py18
-rw-r--r--tests/conftest.py52
-rw-r--r--tests/qc_app/test_entry.py3
-rw-r--r--tests/qc_app/test_parse.py28
-rw-r--r--tests/qc_app/test_uploads_with_zip_files.py3
9 files changed, 118 insertions, 79 deletions
diff --git a/qc_app/dbinsert.py b/qc_app/dbinsert.py
index 31e4cea..f36651f 100644
--- a/qc_app/dbinsert.py
+++ b/qc_app/dbinsert.py
@@ -374,7 +374,7 @@ def insert_data():
app.config["JOBS_TTL_SECONDS"]),
redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors")
- return redirect(url_for("dbinsert.insert_status", job_id=job["job_id"]))
+ return redirect(url_for("dbinsert.insert_status", job_id=job["jobid"]))
return render_error(f"File '{filename}' no longer exists.")
except AssertionError as aserr:
return render_error(f"Missing data: {aserr.args[0]}")
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index 1491015..21889da 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -29,20 +29,20 @@ def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
"""Build the key by appending it to the namespace prefix."""
return f"{namespaceprefix}:{jobid}"
-def raise_jobnotfound(jobid: Union[str,UUID]):
+def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
"""Utility to raise a `NoSuchJobError`"""
- raise JobNotFound(f"Could not retrieve job '{jobid}'.")
+ raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
-def error_filename(job_id, error_dir):
+def error_filename(jobid, error_dir):
"Compute the path of the file where errors will be dumped."
- return f"{error_dir}/job_{job_id}.error"
+ return f"{error_dir}/job_{jobid}.error"
def initialise_job(# pylint: disable=[too-many-arguments]
rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
"Initialise a job 'object' and put in on redis"
the_job = {
- "job_id": jobid, "command": shlex.join(command), "status": "pending",
+ "jobid": jobid, "command": shlex.join(command), "status": "pending",
"percent": 0, "job-type": job_type, **(extra_meta or {})
}
rconn.hset(job_key(rprefix, jobid), mapping=the_job)
@@ -95,13 +95,13 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
if not os.path.exists(error_dir):
os.mkdir(error_dir)
- job_id = the_job["job_id"]
- with open(error_filename(job_id, error_dir),
+ jobid = the_job["jobid"]
+ with open(error_filename(jobid, error_dir),
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
[sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
- job_id],
+ jobid],
stderr=errorfile,
env={"PYTHONPATH": ":".join(sys.path)})
@@ -109,7 +109,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
- thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
+ thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
+ raise_jobnotfound(rprefix, jobid))
return thejob
def update_status(
diff --git a/qc_app/parse.py b/qc_app/parse.py
index d9be993..f0c53d1 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -76,47 +76,47 @@ def parse():
redisurl,
f"{app.config['UPLOAD_FOLDER']}/job_errors")
- return redirect(url_for("parse.parse_status", job_id=job["job_id"]))
+ return redirect(url_for("parse.parse_status", job_id=job["jobid"]))
@parsebp.route("/status/<job_id>", methods=["GET"])
def parse_status(job_id: str):
"Retrieve the status of the job"
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
- job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
-
- if job:
- error_filename = jobs.error_filename(
- job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
- if os.path.exists(error_filename):
- stat = os.stat(error_filename)
- if stat.st_size > 0:
- return redirect(url_for("parse.fail", job_id=job_id))
-
- job_id = job["job_id"]
- progress = float(job["percent"])
- status = job["status"]
- filename = job.get("filename", "uploaded file")
- errors = jsonpickle.decode(
- job.get("errors", jsonpickle.encode(tuple())))
- if status in ("success", "aborted"):
- return redirect(url_for("parse.results", job_id=job_id))
-
- if status == "parse-error":
+ try:
+ job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
+ except jobs.JobNotFound as _exc:
+ return render_template("no_such_job.html", job_id=job_id), 400
+
+ error_filename = jobs.error_filename(
+ job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ if os.path.exists(error_filename):
+ stat = os.stat(error_filename)
+ if stat.st_size > 0:
return redirect(url_for("parse.fail", job_id=job_id))
- app.jinja_env.globals.update(
- isinvalidvalue=isinvalidvalue,
- isduplicateheading=isduplicateheading)
- return render_template(
- "job_progress.html",
- job_id = job_id,
- job_status = status,
- progress = progress,
- message = job.get("message", ""),
- job_name = f"Parsing '{filename}'",
- errors=errors)
-
- return render_template("no_such_job.html", job_id=job_id), 400
+ job_id = job["jobid"]
+ progress = float(job["percent"])
+ status = job["status"]
+ filename = job.get("filename", "uploaded file")
+ errors = jsonpickle.decode(
+ job.get("errors", jsonpickle.encode(tuple())))
+ if status in ("success", "aborted"):
+ return redirect(url_for("parse.results", job_id=job_id))
+
+ if status == "parse-error":
+ return redirect(url_for("parse.fail", job_id=job_id))
+
+ app.jinja_env.globals.update(
+ isinvalidvalue=isinvalidvalue,
+ isduplicateheading=isduplicateheading)
+ return render_template(
+ "job_progress.html",
+ job_id = job_id,
+ job_status = status,
+ progress = progress,
+ message = job.get("message", ""),
+ job_name = f"Parsing '{filename}'",
+ errors=errors)
@parsebp.route("/results/<job_id>", methods=["GET"])
def results(job_id: str):
@@ -135,7 +135,7 @@ def results(job_id: str):
errors=errors,
job_name = f"Parsing '{filename}'",
user_aborted = job.get("user_aborted"),
- job_id=job["job_id"])
+ job_id=job["jobid"])
return render_template("no_such_job.html", job_id=job_id)
diff --git a/qc_app/samples.py b/qc_app/samples.py
index cf97b8d..55e94ab 100644
--- a/qc_app/samples.py
+++ b/qc_app/samples.py
@@ -255,7 +255,7 @@ def upload_samples():
redisuri,
f"{app.config['UPLOAD_FOLDER']}/job_errors")
return redirect(url_for(
- "samples.upload_status", job_id=the_job["job_id"]))
+ "samples.upload_status", job_id=the_job["jobid"]))
@samples.route("/upload/status/<uuid:job_id>", methods=["GET"])
def upload_status(job_id: uuid.UUID):
diff --git a/scripts/worker.py b/scripts/worker.py
index 17d6e06..0eb9ea5 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -6,6 +6,7 @@ import argparse
import traceback
import subprocess
from time import sleep
+from datetime import timedelta
from tempfile import TemporaryDirectory
from redis import Redis
@@ -31,8 +32,8 @@ def parse_args():
def run_job(rconn: Redis, job: dict, redisprefix: str):
"Run the actual job."
- jobid = job["job_id"]
try:
+ jobid = job["jobid"]
with TemporaryDirectory() as tmpdir:
stderrpath = f"{tmpdir}/{jobid}.stderr"
with open(stderrpath, "w+b") as tmpfl:
@@ -41,8 +42,11 @@ def run_job(rconn: Redis, job: dict, redisprefix: str):
stderr=tmpfl) as process:
while process.poll() is None:
jobs.update_status(rconn, redisprefix, jobid, "running")
- jobs.update_stdout_stderr(
- rconn, redisprefix, jobid, process.stdout.read1(), "stdout")
+ jobs.update_stdout_stderr(rconn,
+ redisprefix,
+ jobid,
+ process.stdout.read1(),#type: ignore[union-attr]
+ "stdout")
sleep(1)
jobs.update_status(
@@ -71,11 +75,11 @@ def main():
job = jobs.job(rconn, args.redisprefix, args.job_id)
if job:
return run_job(rconn, job, args.redisprefix)
- jobs.update_status(rconn, args.job_id, "error")
- fqjobid = jobs.job_key(redisprefix, args.jobid)
+ jobs.update_status(rconn, args.redisprefix, args.job_id, "error")
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
- redis_conn.expire(name=job_key(args.redisprefix, args.job_id),
- time=timedelta(seconds=(2 * 60 * 60)))
+ rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id),
+ time=timedelta(seconds=(2 * 60 * 60)))
print(f"No such job. '{args.job_id}'.", file=sys.stderr)
return 2
return 3
diff --git a/tests/conftest.py b/tests/conftest.py
index b7d3f8a..013c30d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -10,7 +10,10 @@ import jsonpickle
from redis import Redis
from functional_tools import take
+
from qc_app import jobs, create_app
+from qc_app.jobs import JOBS_PREFIX
+
from quality_control.errors import InvalidValue, DuplicateHeading
@pytest.fixture(scope="session")
@@ -56,11 +59,26 @@ def client():
cleanup_redis(app.config["REDIS_URL"], test_prefix)
@pytest.fixture(scope="module")
+def db_url(client):#pylint: disable=[redefined-outer-name]
+ """Return the database URI"""
+ return client.application.config["SQL_URI"]
+
+@pytest.fixture(scope="module")
def redis_url(client):#pylint: disable=[redefined-outer-name]
"""Return the redis URI"""
return client.application.config["REDIS_URL"]
@pytest.fixture(scope="module")
+def redis_prefix(client):#pylint: disable=[redefined-outer-name]
+ """Return the redis prefix"""
+ return client.application.config["GNQC_REDIS_PREFIX"]
+
+@pytest.fixture(scope="module")
+def jobs_prefix(redis_prefix):#pylint: disable=[redefined-outer-name]
+ """Return the redis prefix for jobs."""
+ return f"{redis_prefix}:{JOBS_PREFIX}"
+
+@pytest.fixture(scope="module")
def redis_ttl(client):#pylint: disable=[redefined-outer-name]
"""Return the redis URI"""
return client.application.config["JOBS_TTL_SECONDS"]
@@ -81,11 +99,11 @@ def cleanup_job(rconn, jobid, thejob):
rconn.delete(jobs.job_key(jobs.jobsnamespace(), jobid))
@pytest.fixture(scope="function")
-def redis_conn_with_fresh_job(redis_url, redis_ttl, job_id):#pylint: disable=[redefined-outer-name]
+def redis_conn_with_fresh_job(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name]
"redis connection with fresh, unprocessed job"
thejob = {
- "job_id": job_id, "command": "some_test_command", "job_type": "testjob",
- "ttl_seconds": redis_ttl, "extra_meta": {
+ "jobid": job_id, "command": "some_test_command", "job_type": "testjob",
+ "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "extra_meta": {
"filename": "/path/to/some/file.tsv", "percent": 0,
"status": "pending"}}
with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
@@ -94,11 +112,12 @@ def redis_conn_with_fresh_job(redis_url, redis_ttl, job_id):#pylint: disable=[re
cleanup_job(rconn, job_id, thejob)
@pytest.fixture(scope="function")
-def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, job_id):#pylint: disable=[redefined-outer-name]
+def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name]
"redis connection with partially processed job, with no errors"
thejob = {
- "job_id": job_id, "command": "some_test_command",
- "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+ "jobid": job_id, "command": "some_test_command",
+ "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+ "extra_meta": {
"status": "Processing", "filename": "/path/to/some/file.tsv",
"percent": 32.242342}}
with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
@@ -107,11 +126,12 @@ def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, job_id):#pyl
cleanup_job(rconn, job_id, thejob)
@pytest.fixture(scope="function")
-def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name]
+def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
"redis connection with partially processed job, with some errors"
the_job = {
- "job_id": job_id, "command": "some_test_command",
- "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+ "jobid": job_id, "command": "some_test_command",
+ "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+ "extra_meta": {
"status": "Processing", "filename": "/path/to/some/file.tsv",
"percent": 45.34245, "errors": jsonpickle.encode((
DuplicateHeading(
@@ -124,11 +144,12 @@ def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, job_id): #
cleanup_job(rconn, job_id, the_job)
@pytest.fixture(scope="function")
-def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name]
+def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
"redis connection with completely processed job, with no errors"
the_job = {
- "job_id": job_id, "command": "some_test_command",
- "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+ "jobid": job_id, "command": ["complete", "--woerror", "test-command"],
+ "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+ "extra_meta": {
"status": "success", "filename": "/path/to/some/file.tsv",
"percent": 100, "errors": jsonpickle.encode(tuple())}}
with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
@@ -137,11 +158,12 @@ def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, job_id): # pyl
cleanup_job(rconn, job_id, the_job)
@pytest.fixture(scope="function")
-def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name]
+def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
"redis connection with completely processed job, with some errors"
the_job = {
- "job_id": job_id, "command": "some_test_command",
- "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+ "jobid": job_id, "command": ["complete", "--werror", "test-command"],
+ "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+ "extra_meta": {
"status": "success", "filename": "/path/to/some/file.tsv",
"percent": 100, "errors": jsonpickle.encode((
DuplicateHeading(
diff --git a/tests/qc_app/test_entry.py b/tests/qc_app/test_entry.py
index c0be26c..efc72a5 100644
--- a/tests/qc_app/test_entry.py
+++ b/tests/qc_app/test_entry.py
@@ -54,6 +54,7 @@ def test_post_with_correct_data(client):
"""
response = client.post(
"/", data={
+ "speciesid": 1,
"filetype": "average",
"qc_text_file": uploadable_file_object("no_data_errors.tsv")
})
@@ -61,7 +62,7 @@ def test_post_with_correct_data(client):
assert response.status_code == 302
assert b'Redirecting...' in response.data
assert (
- b'/parse/parse?filename=no_data_errors.tsv&amp;filetype=average'
+ b'/parse/parse?speciesid=1&amp;filename=no_data_errors.tsv&amp;filetype=average'
in response.data)
@pytest.mark.parametrize(
diff --git a/tests/qc_app/test_parse.py b/tests/qc_app/test_parse.py
index f173e0a..5e55688 100644
--- a/tests/qc_app/test_parse.py
+++ b/tests/qc_app/test_parse.py
@@ -4,11 +4,18 @@ import sys
import redis
import pytest
-from qc_app.jobs import job
+from qc_app.jobs import job, jobsnamespace
+
from tests.conftest import uploadable_file_object
-def test_parse_with_existing_uploaded_file(
- client, redis_url, job_id, monkeypatch):
+def test_parse_with_existing_uploaded_file(#pylint: disable=[too-many-arguments]
+ client,
+ db_url,
+ redis_url,
+ redis_ttl,
+ jobs_prefix,
+ job_id,
+ monkeypatch):
"""
GIVEN: 1. A flask application testing client
2. A valid file, and filetype
@@ -19,27 +26,30 @@ def test_parse_with_existing_uploaded_file(
"""
monkeypatch.setattr("qc_app.jobs.uuid4", lambda : job_id)
# Upload a file
+ speciesid = 1
filename = "no_data_errors.tsv"
filetype = "average"
client.post(
"/", data={
+ "speciesid": speciesid,
"filetype": filetype,
"qc_text_file": uploadable_file_object(filename)})
# Checks
- resp = client.get(f"/parse/parse?filename={filename}&filetype={filetype}")
+ resp = client.get(f"/parse/parse?speciesid={speciesid}&filename={filename}"
+ f"&filetype={filetype}")
assert resp.status_code == 302
assert b'Redirecting...' in resp.data
assert b'/parse/status/934c55d8-396e-4959-90e1-2698e9205758' in resp.data
with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
- the_job = job(rconn, job_id)
+ the_job = job(rconn, jobsnamespace(), job_id)
- assert the_job["job_id"] == job_id
+ assert the_job["jobid"] == job_id
assert the_job["filename"] == filename
assert the_job["command"] == " ".join([
- sys.executable, "-m", "scripts.validate_file", filetype,
- f"{client.application.config['UPLOAD_FOLDER']}/{filename}", redis_url,
- job_id])
+ sys.executable, "-m", "scripts.validate_file", db_url, redis_url,
+ jobs_prefix, job_id, "--redisexpiry", str(redis_ttl), str(speciesid),
+ filetype, f"{client.application.config['UPLOAD_FOLDER']}/{filename}"])
@pytest.mark.parametrize(
"filename,uri,error_msgs",
diff --git a/tests/qc_app/test_uploads_with_zip_files.py b/tests/qc_app/test_uploads_with_zip_files.py
index 2a43a2d..fb101ad 100644
--- a/tests/qc_app/test_uploads_with_zip_files.py
+++ b/tests/qc_app/test_uploads_with_zip_files.py
@@ -36,12 +36,13 @@ def test_upload_zipfile_with_one_tsv_file(client):
THEN: Ensure that the system redirects to the correct next URL
"""
resp = client.post("/", data={
+ "speciesid": 1,
"filetype": "average",
"qc_text_file": uploadable_file_object("average.tsv.zip")})
assert resp.status_code == 302
assert b"Redirecting..." in resp.data
assert (
- b"/parse/parse?filename=average.tsv.zip&amp;filetype=average"
+ b"/parse/parse?speciesid=1&amp;filename=average.tsv.zip&amp;filetype=average"
in resp.data)
def test_upload_zipfile_with_one_non_tsv_file(client):