From 3bd3049fd403886a9653aa7c2fbd1639926420ea Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 21 Jun 2022 12:28:02 +0300 Subject: Test the progress indication feature --- tests/conftest.py | 83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) (limited to 'tests/conftest.py') diff --git a/tests/conftest.py b/tests/conftest.py index 9755a47..90d8264 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,13 @@ import socket import subprocess from contextlib import closing +import redis import pytest +import jsonpickle from qc_app import create_app from quality_control.parsing import strain_names +from quality_control.errors import InvalidValue, DuplicateHeading @pytest.fixture(scope="session") def strains(): @@ -44,3 +47,83 @@ def uploadable_file_object(filename): "Return an 'object' representing the file to be uploaded." with open(f"tests/test_data/{filename}", "br") as the_file: return (io.BytesIO(the_file.read()), filename) + +@pytest.fixture(scope="function") +def job_id(): + "Return a default UUID4 string as the 'job_id' for test purposes" + return "934c55d8-396e-4959-90e1-2698e9205758" + +@pytest.fixture(scope="function") +def redis_conn_with_fresh_job(redis_url, job_id): # pylint: disable=[redefined-outer-name] + "redis connection with fresh, unprocessed job" + the_job = { + "job_id": job_id, "command": "some_test_command", "status": "pending", + "filename": "/path/to/some/file.tsv", "percent": 0 + } + with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: + rconn.hset(name=job_id, mapping=the_job) + yield rconn + rconn.hdel(job_id, *the_job.keys()) + rconn.delete(job_id) + +@pytest.fixture(scope="function") +def redis_conn_with_in_progress_job_no_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] + "redis connection with partially processed job, with no errors" + the_job = { + "job_id": job_id, "command": "some_test_command", + "status": "Processing", "filename": "/path/to/some/file.tsv", + "percent": 32.242342 + } + with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: + rconn.hset(name=job_id, mapping=the_job) + yield rconn + rconn.hdel(job_id, *the_job.keys()) + rconn.delete(job_id) + +@pytest.fixture(scope="function") +def redis_conn_with_in_progress_job_some_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] + "redis connection with partially processed job, with some errors" + the_job = { + "job_id": job_id, "command": "some_test_command", + "status": "Processing", "filename": "/path/to/some/file.tsv", + "percent": 45.34245, "errors": jsonpickle.encode(( + DuplicateHeading( + 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"), + InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'"))) + } + with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: + rconn.hset(name=job_id, mapping=the_job) + yield rconn + rconn.hdel(job_id, *the_job.keys()) + rconn.delete(job_id) + +@pytest.fixture(scope="function") +def redis_conn_with_completed_job_no_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] + "redis connection with completely processed job, with no errors" + the_job = { + "job_id": job_id, "command": "some_test_command", + "status": "success", "filename": "/path/to/some/file.tsv", + "percent": 100 + } + with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: + rconn.hset(name=job_id, mapping=the_job) + yield rconn + rconn.hdel(job_id, *the_job.keys()) + rconn.delete(job_id) + +@pytest.fixture(scope="function") +def redis_conn_with_completed_job_some_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] + "redis connection with completely processed job, with some errors" + the_job = { + "job_id": job_id, "command": "some_test_command", + "status": "success", "filename": "/path/to/some/file.tsv", + "percent": 100, "errors": jsonpickle.encode(( + DuplicateHeading( + 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"), + InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'"))) + } + with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: + rconn.hset(name=job_id, mapping=the_job) + yield rconn + rconn.hdel(job_id, *the_job.keys()) + rconn.delete(job_id) -- cgit v1.2.3