"""Set up fixtures for tests""" import io import os import shutil import socket import subprocess from contextlib import closing import redis import pytest import jsonpickle from qc_app import create_app from quality_control.parsing import strain_names from quality_control.errors import InvalidValue, DuplicateHeading @pytest.fixture(scope="session") def strains(): """Parse the strains once every test session""" return strain_names("etc/strains.csv") def is_port_in_use(port: int) -> bool: "Check whether `port` is in use" with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sckt: return sckt.connect_ex(("localhost", port)) == 0 @pytest.fixture(scope="session") def redis_url(): "Fixture to launch a new redis instance and return appropriate URI" port = next(# pylint: disable=[stop-iteration-return] port for port in range(6379,65535) if not is_port_in_use(port)) command = [shutil.which("redis-server"), "--port", str(port)] process = subprocess.Popen(command) # pylint: disable=[consider-using-with] yield f"redis://localhost:{port}" process.kill() @pytest.fixture(scope="module") def client(redis_url): # pylint: disable=[redefined-outer-name] "Fixture for test client" app = create_app(os.environ.get("QCAPP_INSTANCE_PATH")) app.config.update({ "REDIS_URL": redis_url, "TESTING": True }) yield app.test_client() def uploadable_file_object(filename): "Return an 'object' representing the file to be uploaded." with open(f"tests/test_data/{filename}", "br") as the_file: return (io.BytesIO(the_file.read()), filename) @pytest.fixture(scope="function") def job_id(): "Return a default UUID4 string as the 'job_id' for test purposes" return "934c55d8-396e-4959-90e1-2698e9205758" @pytest.fixture(scope="function") def redis_conn_with_fresh_job(redis_url, job_id): # pylint: disable=[redefined-outer-name] "redis connection with fresh, unprocessed job" the_job = { "job_id": job_id, "command": "some_test_command", "status": "pending", "filename": "/path/to/some/file.tsv", "percent": 0 } with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: rconn.hset(name=job_id, mapping=the_job) yield rconn rconn.hdel(job_id, *the_job.keys()) rconn.delete(job_id) @pytest.fixture(scope="function") def redis_conn_with_in_progress_job_no_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] "redis connection with partially processed job, with no errors" the_job = { "job_id": job_id, "command": "some_test_command", "status": "Processing", "filename": "/path/to/some/file.tsv", "percent": 32.242342 } with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: rconn.hset(name=job_id, mapping=the_job) yield rconn rconn.hdel(job_id, *the_job.keys()) rconn.delete(job_id) @pytest.fixture(scope="function") def redis_conn_with_in_progress_job_some_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] "redis connection with partially processed job, with some errors" the_job = { "job_id": job_id, "command": "some_test_command", "status": "Processing", "filename": "/path/to/some/file.tsv", "percent": 45.34245, "errors": jsonpickle.encode(( DuplicateHeading( 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"), InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'"))) } with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: rconn.hset(name=job_id, mapping=the_job) yield rconn rconn.hdel(job_id, *the_job.keys()) rconn.delete(job_id) @pytest.fixture(scope="function") def redis_conn_with_completed_job_no_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] "redis connection with completely processed job, with no errors" the_job = { "job_id": job_id, "command": "some_test_command", "status": "success", "filename": "/path/to/some/file.tsv", "percent": 100, "errors": jsonpickle.encode(tuple()) } with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: rconn.hset(name=job_id, mapping=the_job) yield rconn rconn.hdel(job_id, *the_job.keys()) rconn.delete(job_id) @pytest.fixture(scope="function") def redis_conn_with_completed_job_some_errors(redis_url, job_id): # pylint: disable=[redefined-outer-name] "redis connection with completely processed job, with some errors" the_job = { "job_id": job_id, "command": "some_test_command", "status": "success", "filename": "/path/to/some/file.tsv", "percent": 100, "errors": jsonpickle.encode(( DuplicateHeading( 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"), InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'"))) } with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: rconn.hset(name=job_id, mapping=the_job) yield rconn rconn.hdel(job_id, *the_job.keys()) rconn.delete(job_id) @pytest.fixture(scope="function") def uploads_dir(client): # pylint: disable=[redefined-outer-name] """Returns the configured, uploads directory, creating it if it does not exist.""" the_dir = client.application.config["UPLOAD_FOLDER"] if not os.path.exists(the_dir): os.mkdir(the_dir) return the_dir @pytest.fixture(scope="function") def jobs_errors_dir(uploads_dir): # pylint: disable=[redefined-outer-name] """Returns the configured, jobs errors directory, creating it if it does not exist.""" the_dir = f"{uploads_dir}/job_errors" if not os.path.exists(the_dir): os.mkdir(the_dir) return the_dir @pytest.fixture(scope="function") def stderr_with_output(jobs_errors_dir, job_id): # pylint: disable=[redefined-outer-name] """Creates a sample worker error file with some content""" filepath = f"{jobs_errors_dir}/job_{job_id}.error" with open(filepath, "w", encoding="utf8") as error_file: error_file.write("This is an non-empty error file.") error_file.flush() yield filepath os.remove(filepath) @pytest.fixture(scope="function") def stderr_with_no_output(jobs_errors_dir, job_id): # pylint: disable=[redefined-outer-name] """Creates a sample worker error file with no content""" filepath = f"{jobs_errors_dir}/job_{job_id}.error" with open(filepath, "w", encoding="utf-8") as error_file: error_file.flush() yield filepath os.remove(filepath)