diff options
-rw-r--r-- | qc_app/dbinsert.py | 2 | ||||
-rw-r--r-- | qc_app/jobs.py | 19 | ||||
-rw-r--r-- | qc_app/parse.py | 70 | ||||
-rw-r--r-- | qc_app/samples.py | 2 | ||||
-rw-r--r-- | scripts/worker.py | 18 | ||||
-rw-r--r-- | tests/conftest.py | 52 | ||||
-rw-r--r-- | tests/qc_app/test_entry.py | 3 | ||||
-rw-r--r-- | tests/qc_app/test_parse.py | 28 | ||||
-rw-r--r-- | tests/qc_app/test_uploads_with_zip_files.py | 3 |
9 files changed, 118 insertions, 79 deletions
diff --git a/qc_app/dbinsert.py b/qc_app/dbinsert.py index 31e4cea..f36651f 100644 --- a/qc_app/dbinsert.py +++ b/qc_app/dbinsert.py @@ -374,7 +374,7 @@ def insert_data(): app.config["JOBS_TTL_SECONDS"]), redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors") - return redirect(url_for("dbinsert.insert_status", job_id=job["job_id"])) + return redirect(url_for("dbinsert.insert_status", job_id=job["jobid"])) return render_error(f"File '{filename}' no longer exists.") except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") diff --git a/qc_app/jobs.py b/qc_app/jobs.py index 1491015..21889da 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -29,20 +29,20 @@ def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str: """Build the key by appending it to the namespace prefix.""" return f"{namespaceprefix}:{jobid}" -def raise_jobnotfound(jobid: Union[str,UUID]): +def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]): """Utility to raise a `NoSuchJobError`""" - raise JobNotFound(f"Could not retrieve job '{jobid}'.") + raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.") -def error_filename(job_id, error_dir): +def error_filename(jobid, error_dir): "Compute the path of the file where errors will be dumped." - return f"{error_dir}/job_{job_id}.error" + return f"{error_dir}/job_{jobid}.error" def initialise_job(# pylint: disable=[too-many-arguments] rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str, ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict: "Initialise a job 'object' and put in on redis" the_job = { - "job_id": jobid, "command": shlex.join(command), "status": "pending", + "jobid": jobid, "command": shlex.join(command), "status": "pending", "percent": 0, "job-type": job_type, **(extra_meta or {}) } rconn.hset(job_key(rprefix, jobid), mapping=the_job) @@ -95,13 +95,13 @@ def launch_job(the_job: dict, redisurl: str, error_dir): if not os.path.exists(error_dir): os.mkdir(error_dir) - job_id = the_job["job_id"] - with open(error_filename(job_id, error_dir), + jobid = the_job["jobid"] + with open(error_filename(jobid, error_dir), "w", encoding="utf-8") as errorfile: subprocess.Popen( # pylint: disable=[consider-using-with] [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(), - job_id], + jobid], stderr=errorfile, env={"PYTHONPATH": ":".join(sys.path)}) @@ -109,7 +109,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir): def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): "Retrieve the job" - thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid) + thejob = (rconn.hgetall(job_key(rprefix, jobid)) or + raise_jobnotfound(rprefix, jobid)) return thejob def update_status( diff --git a/qc_app/parse.py b/qc_app/parse.py index d9be993..f0c53d1 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -76,47 +76,47 @@ def parse(): redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors") - return redirect(url_for("parse.parse_status", job_id=job["job_id"])) + return redirect(url_for("parse.parse_status", job_id=job["jobid"])) @parsebp.route("/status/<job_id>", methods=["GET"]) def parse_status(job_id: str): "Retrieve the status of the job" with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - job = jobs.job(rconn, jobs.jobsnamespace(), job_id) - - if job: - error_filename = jobs.error_filename( - job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors") - if os.path.exists(error_filename): - stat = os.stat(error_filename) - if stat.st_size > 0: - return redirect(url_for("parse.fail", job_id=job_id)) - - job_id = job["job_id"] - progress = float(job["percent"]) - status = job["status"] - filename = job.get("filename", "uploaded file") - errors = jsonpickle.decode( - job.get("errors", jsonpickle.encode(tuple()))) - if status in ("success", "aborted"): - return redirect(url_for("parse.results", job_id=job_id)) - - if status == "parse-error": + try: + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) + except jobs.JobNotFound as _exc: + return render_template("no_such_job.html", job_id=job_id), 400 + + error_filename = jobs.error_filename( + job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors") + if os.path.exists(error_filename): + stat = os.stat(error_filename) + if stat.st_size > 0: return redirect(url_for("parse.fail", job_id=job_id)) - app.jinja_env.globals.update( - isinvalidvalue=isinvalidvalue, - isduplicateheading=isduplicateheading) - return render_template( - "job_progress.html", - job_id = job_id, - job_status = status, - progress = progress, - message = job.get("message", ""), - job_name = f"Parsing '{filename}'", - errors=errors) - - return render_template("no_such_job.html", job_id=job_id), 400 + job_id = job["jobid"] + progress = float(job["percent"]) + status = job["status"] + filename = job.get("filename", "uploaded file") + errors = jsonpickle.decode( + job.get("errors", jsonpickle.encode(tuple()))) + if status in ("success", "aborted"): + return redirect(url_for("parse.results", job_id=job_id)) + + if status == "parse-error": + return redirect(url_for("parse.fail", job_id=job_id)) + + app.jinja_env.globals.update( + isinvalidvalue=isinvalidvalue, + isduplicateheading=isduplicateheading) + return render_template( + "job_progress.html", + job_id = job_id, + job_status = status, + progress = progress, + message = job.get("message", ""), + job_name = f"Parsing '{filename}'", + errors=errors) @parsebp.route("/results/<job_id>", methods=["GET"]) def results(job_id: str): @@ -135,7 +135,7 @@ def results(job_id: str): errors=errors, job_name = f"Parsing '{filename}'", user_aborted = job.get("user_aborted"), - job_id=job["job_id"]) + job_id=job["jobid"]) return render_template("no_such_job.html", job_id=job_id) diff --git a/qc_app/samples.py b/qc_app/samples.py index cf97b8d..55e94ab 100644 --- a/qc_app/samples.py +++ b/qc_app/samples.py @@ -255,7 +255,7 @@ def upload_samples(): redisuri, f"{app.config['UPLOAD_FOLDER']}/job_errors") return redirect(url_for( - "samples.upload_status", job_id=the_job["job_id"])) + "samples.upload_status", job_id=the_job["jobid"])) @samples.route("/upload/status/<uuid:job_id>", methods=["GET"]) def upload_status(job_id: uuid.UUID): diff --git a/scripts/worker.py b/scripts/worker.py index 17d6e06..0eb9ea5 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -6,6 +6,7 @@ import argparse import traceback import subprocess from time import sleep +from datetime import timedelta from tempfile import TemporaryDirectory from redis import Redis @@ -31,8 +32,8 @@ def parse_args(): def run_job(rconn: Redis, job: dict, redisprefix: str): "Run the actual job." - jobid = job["job_id"] try: + jobid = job["jobid"] with TemporaryDirectory() as tmpdir: stderrpath = f"{tmpdir}/{jobid}.stderr" with open(stderrpath, "w+b") as tmpfl: @@ -41,8 +42,11 @@ def run_job(rconn: Redis, job: dict, redisprefix: str): stderr=tmpfl) as process: while process.poll() is None: jobs.update_status(rconn, redisprefix, jobid, "running") - jobs.update_stdout_stderr( - rconn, redisprefix, jobid, process.stdout.read1(), "stdout") + jobs.update_stdout_stderr(rconn, + redisprefix, + jobid, + process.stdout.read1(),#type: ignore[union-attr] + "stdout") sleep(1) jobs.update_status( @@ -71,11 +75,11 @@ def main(): job = jobs.job(rconn, args.redisprefix, args.job_id) if job: return run_job(rconn, job, args.redisprefix) - jobs.update_status(rconn, args.job_id, "error") - fqjobid = jobs.job_key(redisprefix, args.jobid) + jobs.update_status(rconn, args.redisprefix, args.job_id, "error") + fqjobid = jobs.job_key(args.redisprefix, args.jobid) rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.") - redis_conn.expire(name=job_key(args.redisprefix, args.job_id), - time=timedelta(seconds=(2 * 60 * 60))) + rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id), + time=timedelta(seconds=(2 * 60 * 60))) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 diff --git a/tests/conftest.py b/tests/conftest.py index b7d3f8a..013c30d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,10 @@ import jsonpickle from redis import Redis from functional_tools import take + from qc_app import jobs, create_app +from qc_app.jobs import JOBS_PREFIX + from quality_control.errors import InvalidValue, DuplicateHeading @pytest.fixture(scope="session") @@ -56,11 +59,26 @@ def client(): cleanup_redis(app.config["REDIS_URL"], test_prefix) @pytest.fixture(scope="module") +def db_url(client):#pylint: disable=[redefined-outer-name] + """Return the database URI""" + return client.application.config["SQL_URI"] + +@pytest.fixture(scope="module") def redis_url(client):#pylint: disable=[redefined-outer-name] """Return the redis URI""" return client.application.config["REDIS_URL"] @pytest.fixture(scope="module") +def redis_prefix(client):#pylint: disable=[redefined-outer-name] + """Return the redis prefix""" + return client.application.config["GNQC_REDIS_PREFIX"] + +@pytest.fixture(scope="module") +def jobs_prefix(redis_prefix):#pylint: disable=[redefined-outer-name] + """Return the redis prefix for jobs.""" + return f"{redis_prefix}:{JOBS_PREFIX}" + +@pytest.fixture(scope="module") def redis_ttl(client):#pylint: disable=[redefined-outer-name] """Return the redis URI""" return client.application.config["JOBS_TTL_SECONDS"] @@ -81,11 +99,11 @@ def cleanup_job(rconn, jobid, thejob): rconn.delete(jobs.job_key(jobs.jobsnamespace(), jobid)) @pytest.fixture(scope="function") -def redis_conn_with_fresh_job(redis_url, redis_ttl, job_id):#pylint: disable=[redefined-outer-name] +def redis_conn_with_fresh_job(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name] "redis connection with fresh, unprocessed job" thejob = { - "job_id": job_id, "command": "some_test_command", "job_type": "testjob", - "ttl_seconds": redis_ttl, "extra_meta": { + "jobid": job_id, "command": "some_test_command", "job_type": "testjob", + "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "extra_meta": { "filename": "/path/to/some/file.tsv", "percent": 0, "status": "pending"}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: @@ -94,11 +112,12 @@ def redis_conn_with_fresh_job(redis_url, redis_ttl, job_id):#pylint: disable=[re cleanup_job(rconn, job_id, thejob) @pytest.fixture(scope="function") -def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, job_id):#pylint: disable=[redefined-outer-name] +def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name] "redis connection with partially processed job, with no errors" thejob = { - "job_id": job_id, "command": "some_test_command", - "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": { + "jobid": job_id, "command": "some_test_command", + "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", + "extra_meta": { "status": "Processing", "filename": "/path/to/some/file.tsv", "percent": 32.242342}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: @@ -107,11 +126,12 @@ def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, job_id):#pyl cleanup_job(rconn, job_id, thejob) @pytest.fixture(scope="function") -def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name] +def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name] "redis connection with partially processed job, with some errors" the_job = { - "job_id": job_id, "command": "some_test_command", - "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": { + "jobid": job_id, "command": "some_test_command", + "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", + "extra_meta": { "status": "Processing", "filename": "/path/to/some/file.tsv", "percent": 45.34245, "errors": jsonpickle.encode(( DuplicateHeading( @@ -124,11 +144,12 @@ def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, job_id): # cleanup_job(rconn, job_id, the_job) @pytest.fixture(scope="function") -def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name] +def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name] "redis connection with completely processed job, with no errors" the_job = { - "job_id": job_id, "command": "some_test_command", - "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": { + "jobid": job_id, "command": ["complete", "--woerror", "test-command"], + "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", + "extra_meta": { "status": "success", "filename": "/path/to/some/file.tsv", "percent": 100, "errors": jsonpickle.encode(tuple())}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: @@ -137,11 +158,12 @@ def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, job_id): # pyl cleanup_job(rconn, job_id, the_job) @pytest.fixture(scope="function") -def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, job_id): # pylint: disable=[redefined-outer-name] +def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name] "redis connection with completely processed job, with some errors" the_job = { - "job_id": job_id, "command": "some_test_command", - "ttl_seconds": redis_ttl, "job_type": "testjob", "extra_meta": { + "jobid": job_id, "command": ["complete", "--werror", "test-command"], + "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", + "extra_meta": { "status": "success", "filename": "/path/to/some/file.tsv", "percent": 100, "errors": jsonpickle.encode(( DuplicateHeading( diff --git a/tests/qc_app/test_entry.py b/tests/qc_app/test_entry.py index c0be26c..efc72a5 100644 --- a/tests/qc_app/test_entry.py +++ b/tests/qc_app/test_entry.py @@ -54,6 +54,7 @@ def test_post_with_correct_data(client): """ response = client.post( "/", data={ + "speciesid": 1, "filetype": "average", "qc_text_file": uploadable_file_object("no_data_errors.tsv") }) @@ -61,7 +62,7 @@ def test_post_with_correct_data(client): assert response.status_code == 302 assert b'Redirecting...' in response.data assert ( - b'/parse/parse?filename=no_data_errors.tsv&filetype=average' + b'/parse/parse?speciesid=1&filename=no_data_errors.tsv&filetype=average' in response.data) @pytest.mark.parametrize( diff --git a/tests/qc_app/test_parse.py b/tests/qc_app/test_parse.py index f173e0a..5e55688 100644 --- a/tests/qc_app/test_parse.py +++ b/tests/qc_app/test_parse.py @@ -4,11 +4,18 @@ import sys import redis import pytest -from qc_app.jobs import job +from qc_app.jobs import job, jobsnamespace + from tests.conftest import uploadable_file_object -def test_parse_with_existing_uploaded_file( - client, redis_url, job_id, monkeypatch): +def test_parse_with_existing_uploaded_file(#pylint: disable=[too-many-arguments] + client, + db_url, + redis_url, + redis_ttl, + jobs_prefix, + job_id, + monkeypatch): """ GIVEN: 1. A flask application testing client 2. A valid file, and filetype @@ -19,27 +26,30 @@ def test_parse_with_existing_uploaded_file( """ monkeypatch.setattr("qc_app.jobs.uuid4", lambda : job_id) # Upload a file + speciesid = 1 filename = "no_data_errors.tsv" filetype = "average" client.post( "/", data={ + "speciesid": speciesid, "filetype": filetype, "qc_text_file": uploadable_file_object(filename)}) # Checks - resp = client.get(f"/parse/parse?filename={filename}&filetype={filetype}") + resp = client.get(f"/parse/parse?speciesid={speciesid}&filename={filename}" + f"&filetype={filetype}") assert resp.status_code == 302 assert b'Redirecting...' in resp.data assert b'/parse/status/934c55d8-396e-4959-90e1-2698e9205758' in resp.data with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: - the_job = job(rconn, job_id) + the_job = job(rconn, jobsnamespace(), job_id) - assert the_job["job_id"] == job_id + assert the_job["jobid"] == job_id assert the_job["filename"] == filename assert the_job["command"] == " ".join([ - sys.executable, "-m", "scripts.validate_file", filetype, - f"{client.application.config['UPLOAD_FOLDER']}/{filename}", redis_url, - job_id]) + sys.executable, "-m", "scripts.validate_file", db_url, redis_url, + jobs_prefix, job_id, "--redisexpiry", str(redis_ttl), str(speciesid), + filetype, f"{client.application.config['UPLOAD_FOLDER']}/{filename}"]) @pytest.mark.parametrize( "filename,uri,error_msgs", diff --git a/tests/qc_app/test_uploads_with_zip_files.py b/tests/qc_app/test_uploads_with_zip_files.py index 2a43a2d..fb101ad 100644 --- a/tests/qc_app/test_uploads_with_zip_files.py +++ b/tests/qc_app/test_uploads_with_zip_files.py @@ -36,12 +36,13 @@ def test_upload_zipfile_with_one_tsv_file(client): THEN: Ensure that the system redirects to the correct next URL """ resp = client.post("/", data={ + "speciesid": 1, "filetype": "average", "qc_text_file": uploadable_file_object("average.tsv.zip")}) assert resp.status_code == 302 assert b"Redirecting..." in resp.data assert ( - b"/parse/parse?filename=average.tsv.zip&filetype=average" + b"/parse/parse?speciesid=1&filename=average.tsv.zip&filetype=average" in resp.data) def test_upload_zipfile_with_one_non_tsv_file(client): |