about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--qc_app/dbinsert.py2
-rw-r--r--qc_app/jobs.py19
-rw-r--r--qc_app/parse.py70
-rw-r--r--qc_app/samples.py2
-rw-r--r--scripts/worker.py18
-rw-r--r--tests/conftest.py52
-rw-r--r--tests/qc_app/test_entry.py3
-rw-r--r--tests/qc_app/test_parse.py28
-rw-r--r--tests/qc_app/test_uploads_with_zip_files.py3
9 files changed, 118 insertions, 79 deletions
diff --git a/qc_app/dbinsert.py b/qc_app/dbinsert.py
index 31e4cea..f36651f 100644
--- a/qc_app/dbinsert.py
+++ b/qc_app/dbinsert.py
@@ -374,7 +374,7 @@ def insert_data():
                         app.config["JOBS_TTL_SECONDS"]),
                     redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors")
 
-            return redirect(url_for("dbinsert.insert_status", job_id=job["job_id"]))
+            return redirect(url_for("dbinsert.insert_status", job_id=job["jobid"]))
         return render_error(f"File '{filename}' no longer exists.")
     except AssertionError as aserr:
         return render_error(f"Missing data: {aserr.args[0]}")
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index 1491015..21889da 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -29,20 +29,20 @@ def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
     """Build the key by appending it to the namespace prefix."""
     return f"{namespaceprefix}:{jobid}"
 
-def raise_jobnotfound(jobid: Union[str,UUID]):
+def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
     """Utility to raise a `NoSuchJobError`"""
-    raise JobNotFound(f"Could not retrieve job '{jobid}'.")
+    raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
 
-def error_filename(job_id, error_dir):
+def error_filename(jobid, error_dir):
     "Compute the path of the file where errors will be dumped."
-    return f"{error_dir}/job_{job_id}.error"
+    return f"{error_dir}/job_{jobid}.error"
 
 def initialise_job(# pylint: disable=[too-many-arguments]
         rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
         ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
     "Initialise a job 'object' and put in on redis"
     the_job = {
-        "job_id": jobid, "command": shlex.join(command), "status": "pending",
+        "jobid": jobid, "command": shlex.join(command), "status": "pending",
         "percent": 0, "job-type": job_type, **(extra_meta or {})
     }
     rconn.hset(job_key(rprefix, jobid), mapping=the_job)
@@ -95,13 +95,13 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
     if not os.path.exists(error_dir):
         os.mkdir(error_dir)
 
-    job_id = the_job["job_id"]
-    with open(error_filename(job_id, error_dir),
+    jobid = the_job["jobid"]
+    with open(error_filename(jobid, error_dir),
               "w",
               encoding="utf-8") as errorfile:
         subprocess.Popen( # pylint: disable=[consider-using-with]
             [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
-             job_id],
+             jobid],
             stderr=errorfile,
             env={"PYTHONPATH": ":".join(sys.path)})
 
@@ -109,7 +109,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
 
 def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
     "Retrieve the job"
-    thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
+    thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
+              raise_jobnotfound(rprefix, jobid))
     return thejob
 
 def update_status(
diff --git a/qc_app/parse.py b/qc_app/parse.py
index d9be993..f0c53d1 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -76,47 +76,47 @@ def parse():
             redisurl,
             f"{app.config['UPLOAD_FOLDER']}/job_errors")
 
-    return redirect(url_for("parse.parse_status", job_id=job["job_id"]))
+    return redirect(url_for("parse.parse_status", job_id=job["jobid"]))
 
 @parsebp.route("/status/<job_id>", methods=["GET"])
 def parse_status(job_id: str):
     "Retrieve the status of the job"
     with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
-        job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
-
-    if job:
-        error_filename = jobs.error_filename(
-            job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
-        if os.path.exists(error_filename):
-            stat = os.stat(error_filename)
-            if stat.st_size > 0:
-                return redirect(url_for("parse.fail", job_id=job_id))
-
-        job_id = job["job_id"]
-        progress = float(job["percent"])
-        status = job["status"]
-        filename = job.get("filename", "uploaded file")
-        errors = jsonpickle.decode(
-            job.get("errors", jsonpickle.encode(tuple())))
-        if status in ("success", "aborted"):
-            return redirect(url_for("parse.results", job_id=job_id))
-
-        if status == "parse-error":
+        try:
+            job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
+        except jobs.JobNotFound as _exc:
+            return render_template("no_such_job.html", job_id=job_id), 400
+
+    error_filename = jobs.error_filename(
+        job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+    if os.path.exists(error_filename):
+        stat = os.stat(error_filename)
+        if stat.st_size > 0:
             return redirect(url_for("parse.fail", job_id=job_id))
 
-        app.jinja_env.globals.update(
-            isinvalidvalue=isinvalidvalue,
-            isduplicateheading=isduplicateheading)
-        return render_template(
-            "job_progress.html",
-            job_id = job_id,
-            job_status = status,
-            progress = progress,
-            message = job.get("message", ""),
-            job_name = f"Parsing '{filename}'",
-            errors=errors)
-
-    return render_template("no_such_job.html", job_id=job_id), 400
+    job_id = job["jobid"]
+    progress = float(job["percent"])
+    status = job["status"]
+    filename = job.get("filename", "uploaded file")
+    errors = jsonpickle.decode(
+        job.get("errors", jsonpickle.encode(tuple())))
+    if status in ("success", "aborted"):
+        return redirect(url_for("parse.results", job_id=job_id))
+
+    if status == "parse-error":
+        return redirect(url_for("parse.fail", job_id=job_id))
+
+    app.jinja_env.globals.update(
+        isinvalidvalue=isinvalidvalue,
+        isduplicateheading=isduplicateheading)
+    return render_template(
+        "job_progress.html",
+        job_id = job_id,
+        job_status = status,
+        progress = progress,
+        message = job.get("message", ""),
+        job_name = f"Parsing '{filename}'",
+        errors=errors)
 
 @parsebp.route("/results/<job_id>", methods=["GET"])
 def results(job_id: str):
@@ -135,7 +135,7 @@ def results(job_id: str):
             errors=errors,
             job_name = f"Parsing '{filename}'",
             user_aborted = job.get("user_aborted"),
-            job_id=job["job_id"])
+            job_id=job["jobid"])
 
     return render_template("no_such_job.html", job_id=job_id)
 
diff --git a/qc_app/samples.py b/qc_app/samples.py
index cf97b8d..55e94ab 100644
--- a/qc_app/samples.py
+++ b/qc_app/samples.py
@@ -255,7 +255,7 @@ def upload_samples():
                 redisuri,
                 f"{app.config['UPLOAD_FOLDER']}/job_errors")
             return redirect(url_for(
-                "samples.upload_status", job_id=the_job["job_id"]))
+                "samples.upload_status", job_id=the_job["jobid"]))
 
 @samples.route("/upload/status/<uuid:job_id>", methods=["GET"])
 def upload_status(job_id: uuid.UUID):
diff --git a/scripts/worker.py b/scripts/worker.py
index 17d6e06..0eb9ea5 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -6,6 +6,7 @@ import argparse
 import traceback
 import subprocess
 from time import sleep
+from datetime import timedelta
 from tempfile import TemporaryDirectory
 
 from redis import Redis
@@ -31,8 +32,8 @@ def parse_args():
 
 def run_job(rconn: Redis, job: dict, redisprefix: str):
     "Run the actual job."
-    jobid = job["job_id"]
     try:
+        jobid = job["jobid"]
         with TemporaryDirectory() as tmpdir:
             stderrpath = f"{tmpdir}/{jobid}.stderr"
             with open(stderrpath, "w+b") as tmpfl:
@@ -41,8 +42,11 @@ def run_job(rconn: Redis, job: dict, redisprefix: str):
                         stderr=tmpfl) as process:
                     while process.poll() is None:
                         jobs.update_status(rconn, redisprefix, jobid, "running")
-                        jobs.update_stdout_stderr(
-                            rconn, redisprefix, jobid, process.stdout.read1(), "stdout")
+                        jobs.update_stdout_stderr(rconn,
+                                                  redisprefix,
+                                                  jobid,
+                                                  process.stdout.read1(),#type: ignore[union-attr]
+                                                  "stdout")
                         sleep(1)
 
                     jobs.update_status(
@@ -71,11 +75,11 @@ def main():
         job = jobs.job(rconn, args.redisprefix, args.job_id)
         if job:
             return run_job(rconn, job, args.redisprefix)
-        jobs.update_status(rconn, args.job_id, "error")
-        fqjobid = jobs.job_key(redisprefix, args.jobid)
+        jobs.update_status(rconn, args.redisprefix, args.job_id, "error")
+        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
         rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
-        redis_conn.expire(name=job_key(args.redisprefix, args.job_id),
-                          time=timedelta(seconds=(2 * 60 * 60)))
+        rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id),
+                     time=timedelta(seconds=(2 * 60 * 60)))
         print(f"No such job. '{args.job_id}'.", file=sys.stderr)
         return 2
     return 3
diff --git a/tests/conftest.py b/tests/conftest.py
index b7d3f8a..013c30d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -10,7 +10,10 @@ import jsonpickle
 from redis import Redis
 
 from functional_tools import take
+
 from qc_app import jobs, create_app
+from qc_app.jobs import JOBS_PREFIX
+
 from quality_control.errors import InvalidValue, DuplicateHeading
 
 @pytest.fixture(scope="session")
@@ -56,11 +59,26 @@ def client():
     cleanup_redis(app.config["REDIS_URL"], test_prefix)
 
 @pytest.fixture(scope="module")
+def db_url(client):#pylint: disable=[redefined-outer-name]
+    """Return the database URI"""
+    return client.application.config["SQL_URI"]
+
+@pytest.fixture(scope="module")
 def redis_url(client):#pylint: disable=[redefined-outer-name]
     """Return the redis URI"""
     return client.application.config["REDIS_URL"]
 
 @pytest.fixture(scope="module")
+def redis_prefix(client):#pylint: disable=[redefined-outer-name]
+    """Return the redis prefix"""
+    return client.application.config["GNQC_REDIS_PREFIX"]
+
+@pytest.fixture(scope="module")
+def jobs_prefix(redis_prefix):#pylint: disable=[redefined-outer-name]
+    """Return the redis prefix for jobs."""
+    return f"{redis_prefix}:{JOBS_PREFIX}"
+
+@pytest.fixture(scope="module")
 def redis_ttl(client):#pylint: disable=[redefined-outer-name]
     """Return the redis URI"""
     return client.application.config["JOBS_TTL_SECONDS"]
@@ -81,11 +99,11 @@ def cleanup_job(rconn, jobid, thejob):
     rconn.delete(jobs.job_key(jobs.jobsnamespace(), jobid))
 
 @pytest.fixture(scope="function")
-def redis_conn_with_fresh_job(redis_url, redis_ttl, job_id):#pylint: disable=[redefined-outer-name]
+def redis_conn_with_fresh_job(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name]
     "redis connection with fresh, unprocessed job"
     thejob = {
-        "job_id": job_id, "command": "some_test_command", "job_type": "testjob",
-        "ttl_seconds": redis_ttl, "extra_meta": {
+        "jobid": job_id, "command": "some_test_command", "job_type": "testjob",
+        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "extra_meta": {
             "filename": "/path/to/some/file.tsv", "percent": 0,
             "status": "pending"}}
     with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
@@ -94,11 +112,12 @@ def redis_conn_with_fresh_job(redis_url, redis_ttl, job_id):#pylint: disable=[re
         cleanup_job(rconn, job_id, thejob)
 
 @pytest.fixture(scope="function")
-def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, job_id):#pylint: disable=[redefined-outer-name]
+def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name]
     "redis connection with partially processed job, with no errors"
     thejob = {
-        "job_id": job_id, "command": "some_test_command",
-        "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+        "jobid": job_id, "command": "some_test_command",
+        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+        "extra_meta": {
             "status": "Processing", "filename": "/path/to/some/file.tsv",
             "percent": 32.242342}}
     with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
@@ -107,11 +126,12 @@ def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, job_id):#pyl
         cleanup_job(rconn, job_id, thejob)
 
 @pytest.fixture(scope="function")
-def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name]
+def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
     "redis connection with partially processed job, with some errors"
     the_job = {
-        "job_id": job_id, "command": "some_test_command",
-        "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+        "jobid": job_id, "command": "some_test_command",
+        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+        "extra_meta": {
             "status": "Processing", "filename": "/path/to/some/file.tsv",
             "percent": 45.34245, "errors": jsonpickle.encode((
                 DuplicateHeading(
@@ -124,11 +144,12 @@ def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, job_id): #
         cleanup_job(rconn, job_id, the_job)
 
 @pytest.fixture(scope="function")
-def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name]
+def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
     "redis connection with completely processed job, with no errors"
     the_job = {
-        "job_id": job_id, "command": "some_test_command",
-        "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+        "jobid": job_id, "command": ["complete", "--woerror", "test-command"],
+        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+        "extra_meta": {
             "status": "success", "filename": "/path/to/some/file.tsv",
             "percent": 100, "errors": jsonpickle.encode(tuple())}}
     with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
@@ -137,11 +158,12 @@ def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, job_id): # pyl
         cleanup_job(rconn, job_id, the_job)
 
 @pytest.fixture(scope="function")
-def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name]
+def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
     "redis connection with completely processed job, with some errors"
     the_job = {
-        "job_id": job_id, "command": "some_test_command",
-        "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": {
+        "jobid": job_id, "command": ["complete", "--werror", "test-command"],
+        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
+        "extra_meta": {
             "status": "success", "filename": "/path/to/some/file.tsv",
             "percent": 100, "errors": jsonpickle.encode((
                 DuplicateHeading(
diff --git a/tests/qc_app/test_entry.py b/tests/qc_app/test_entry.py
index c0be26c..efc72a5 100644
--- a/tests/qc_app/test_entry.py
+++ b/tests/qc_app/test_entry.py
@@ -54,6 +54,7 @@ def test_post_with_correct_data(client):
     """
     response = client.post(
         "/", data={
+            "speciesid": 1,
             "filetype": "average",
             "qc_text_file": uploadable_file_object("no_data_errors.tsv")
         })
@@ -61,7 +62,7 @@ def test_post_with_correct_data(client):
     assert response.status_code == 302
     assert b'Redirecting...' in response.data
     assert (
-        b'/parse/parse?filename=no_data_errors.tsv&amp;filetype=average'
+        b'/parse/parse?speciesid=1&amp;filename=no_data_errors.tsv&amp;filetype=average'
         in response.data)
 
 @pytest.mark.parametrize(
diff --git a/tests/qc_app/test_parse.py b/tests/qc_app/test_parse.py
index f173e0a..5e55688 100644
--- a/tests/qc_app/test_parse.py
+++ b/tests/qc_app/test_parse.py
@@ -4,11 +4,18 @@ import sys
 import redis
 import pytest
 
-from qc_app.jobs import job
+from qc_app.jobs import job, jobsnamespace
+
 from tests.conftest import uploadable_file_object
 
-def test_parse_with_existing_uploaded_file(
-        client, redis_url, job_id, monkeypatch):
+def test_parse_with_existing_uploaded_file(#pylint: disable=[too-many-arguments]
+        client,
+        db_url,
+        redis_url,
+        redis_ttl,
+        jobs_prefix,
+        job_id,
+        monkeypatch):
     """
     GIVEN: 1. A flask application testing client
            2. A valid file, and filetype
@@ -19,27 +26,30 @@ def test_parse_with_existing_uploaded_file(
     """
     monkeypatch.setattr("qc_app.jobs.uuid4", lambda : job_id)
     # Upload a file
+    speciesid = 1
     filename = "no_data_errors.tsv"
     filetype = "average"
     client.post(
         "/", data={
+            "speciesid": speciesid,
             "filetype": filetype,
             "qc_text_file": uploadable_file_object(filename)})
     # Checks
-    resp = client.get(f"/parse/parse?filename={filename}&filetype={filetype}")
+    resp = client.get(f"/parse/parse?speciesid={speciesid}&filename={filename}"
+                      f"&filetype={filetype}")
     assert resp.status_code == 302
     assert b'Redirecting...' in resp.data
     assert b'/parse/status/934c55d8-396e-4959-90e1-2698e9205758' in resp.data
 
     with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
-        the_job = job(rconn, job_id)
+        the_job = job(rconn, jobsnamespace(), job_id)
 
-    assert the_job["job_id"] == job_id
+    assert the_job["jobid"] == job_id
     assert the_job["filename"] == filename
     assert the_job["command"] == " ".join([
-        sys.executable, "-m", "scripts.validate_file", filetype,
-        f"{client.application.config['UPLOAD_FOLDER']}/{filename}", redis_url,
-        job_id])
+        sys.executable, "-m", "scripts.validate_file", db_url, redis_url,
+        jobs_prefix, job_id, "--redisexpiry", str(redis_ttl), str(speciesid),
+        filetype, f"{client.application.config['UPLOAD_FOLDER']}/{filename}"])
 
 @pytest.mark.parametrize(
     "filename,uri,error_msgs",
diff --git a/tests/qc_app/test_uploads_with_zip_files.py b/tests/qc_app/test_uploads_with_zip_files.py
index 2a43a2d..fb101ad 100644
--- a/tests/qc_app/test_uploads_with_zip_files.py
+++ b/tests/qc_app/test_uploads_with_zip_files.py
@@ -36,12 +36,13 @@ def test_upload_zipfile_with_one_tsv_file(client):
     THEN: Ensure that the system redirects to the correct next URL
     """
     resp = client.post("/", data={
+        "speciesid": 1,
         "filetype": "average",
         "qc_text_file": uploadable_file_object("average.tsv.zip")})
     assert resp.status_code == 302
     assert b"Redirecting..." in resp.data
     assert (
-        b"/parse/parse?filename=average.tsv.zip&amp;filetype=average"
+        b"/parse/parse?speciesid=1&amp;filename=average.tsv.zip&amp;filetype=average"
         in resp.data)
 
 def test_upload_zipfile_with_one_non_tsv_file(client):