aboutsummaryrefslogtreecommitdiff
path: root/tests/conftest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/conftest.py')
-rw-r--r--tests/conftest.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/tests/conftest.py b/tests/conftest.py
index 9755a47..90d8264 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -5,10 +5,13 @@ import socket
import subprocess
from contextlib import closing
+import redis
import pytest
+import jsonpickle
from qc_app import create_app
from quality_control.parsing import strain_names
+from quality_control.errors import InvalidValue, DuplicateHeading
@pytest.fixture(scope="session")
def strains():
@@ -44,3 +47,83 @@ def uploadable_file_object(filename):
"Return an 'object' representing the file to be uploaded."
with open(f"tests/test_data/{filename}", "br") as the_file:
return (io.BytesIO(the_file.read()), filename)
+
+@pytest.fixture(scope="function")
+def job_id():
+ "Return a default UUID4 string as the 'job_id' for test purposes"
+ return "934c55d8-396e-4959-90e1-2698e9205758"
+
+@pytest.fixture(scope="function")
+def redis_conn_with_fresh_job(redis_url, job_id): # pylint: disable=[redefined-outer-name]
+ "redis connection with fresh, unprocessed job"
+ the_job = {
+ "job_id": job_id, "command": "some_test_command", "status": "pending",
+ "filename": "/path/to/some/file.tsv", "percent": 0
+ }
+ with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
+ rconn.hset(name=job_id, mapping=the_job)
+ yield rconn
+ rconn.hdel(job_id, *the_job.keys())
+ rconn.delete(job_id)
+
+@pytest.fixture(scope="function")
+def redis_conn_with_in_progress_job_no_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name]
+ "redis connection with partially processed job, with no errors"
+ the_job = {
+ "job_id": job_id, "command": "some_test_command",
+ "status": "Processing", "filename": "/path/to/some/file.tsv",
+ "percent": 32.242342
+ }
+ with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
+ rconn.hset(name=job_id, mapping=the_job)
+ yield rconn
+ rconn.hdel(job_id, *the_job.keys())
+ rconn.delete(job_id)
+
+@pytest.fixture(scope="function")
+def redis_conn_with_in_progress_job_some_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name]
+ "redis connection with partially processed job, with some errors"
+ the_job = {
+ "job_id": job_id, "command": "some_test_command",
+ "status": "Processing", "filename": "/path/to/some/file.tsv",
+ "percent": 45.34245, "errors": jsonpickle.encode((
+ DuplicateHeading(
+ 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"),
+ InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'")))
+ }
+ with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
+ rconn.hset(name=job_id, mapping=the_job)
+ yield rconn
+ rconn.hdel(job_id, *the_job.keys())
+ rconn.delete(job_id)
+
+@pytest.fixture(scope="function")
+def redis_conn_with_completed_job_no_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name]
+ "redis connection with completely processed job, with no errors"
+ the_job = {
+ "job_id": job_id, "command": "some_test_command",
+ "status": "success", "filename": "/path/to/some/file.tsv",
+ "percent": 100
+ }
+ with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
+ rconn.hset(name=job_id, mapping=the_job)
+ yield rconn
+ rconn.hdel(job_id, *the_job.keys())
+ rconn.delete(job_id)
+
+@pytest.fixture(scope="function")
+def redis_conn_with_completed_job_some_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name]
+ "redis connection with completely processed job, with some errors"
+ the_job = {
+ "job_id": job_id, "command": "some_test_command",
+ "status": "success", "filename": "/path/to/some/file.tsv",
+ "percent": 100, "errors": jsonpickle.encode((
+ DuplicateHeading(
+ 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"),
+ InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'")))
+ }
+ with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
+ rconn.hset(name=job_id, mapping=the_job)
+ yield rconn
+ rconn.hdel(job_id, *the_job.keys())
+ rconn.delete(job_id)