"""Set up fixtures for tests""" import io import os import uuid from hashlib import sha256 import redis import pytest import jsonpickle from redis import Redis from functional_tools import take from qc_app import jobs, create_app from qc_app.jobs import JOBS_PREFIX from quality_control.errors import InvalidValue, DuplicateHeading @pytest.fixture(scope="session") def strains(): """Parse the strains once every test session""" stainnames = set() with open("etc/strains.csv", encoding="utf8") as strains_file: for idx, line in enumerate(strains_file.readlines()): if idx > 0: parts = line.split() for name in (parts[1], parts[2]): stainnames.add(name.strip()) if len(parts) >= 6: alias = parts[5].strip() if alias != "" and alias not in ("P", "\\N"): stainnames.add(alias) return tuple(stainnames) def cleanup_redis(redisuri: str, prefix: str): """Delete all keys with given prefix""" with Redis.from_url(redisuri, decode_responses=True) as rconn: cur = rconn.scan_iter(f"{prefix}:*") while True: batch = take(cur, 500) if len(batch) <= 0: break rconn.delete(*batch) @pytest.fixture(scope="module") def client(): "Fixture for test client" app = create_app() test_prefix = sha256(f"test:{uuid.uuid4()}".encode("utf8")).hexdigest() app.config.update({ "TESTING": True, "GNQC_REDIS_PREFIX": f"{test_prefix}:GNQC", "JOBS_TTL_SECONDS": 2 * 60 * 60# 2 hours }) with app.app_context(): yield app.test_client() cleanup_redis(app.config["REDIS_URL"], test_prefix) @pytest.fixture(scope="module") def db_url(client):#pylint: disable=[redefined-outer-name] """Return the database URI""" return client.application.config["SQL_URI"] @pytest.fixture(scope="module") def redis_url(client):#pylint: disable=[redefined-outer-name] """Return the redis URI""" return client.application.config["REDIS_URL"] @pytest.fixture(scope="module") def redis_prefix(client):#pylint: disable=[redefined-outer-name] """Return the redis prefix""" return client.application.config["GNQC_REDIS_PREFIX"] @pytest.fixture(scope="module") def jobs_prefix(redis_prefix):#pylint: disable=[redefined-outer-name] """Return the redis prefix for jobs.""" return f"{redis_prefix}:{JOBS_PREFIX}" @pytest.fixture(scope="module") def redis_ttl(client):#pylint: disable=[redefined-outer-name] """Return the redis URI""" return client.application.config["JOBS_TTL_SECONDS"] def uploadable_file_object(filename): "Return an 'object' representing the file to be uploaded." with open(f"tests/test_data/{filename}", "br") as the_file: return (io.BytesIO(the_file.read()), filename) @pytest.fixture(scope="function") def job_id(): "Return a default UUID4 string as the 'job_id' for test purposes" return "934c55d8-396e-4959-90e1-2698e9205758" def cleanup_job(rconn, jobid, thejob): """Delete job from redis.""" rconn.hdel(jobs.job_key(jobs.jobsnamespace(), jobid), *thejob.keys()) rconn.delete(jobs.job_key(jobs.jobsnamespace(), jobid)) @pytest.fixture(scope="function") def redis_conn_with_fresh_job(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name] "redis connection with fresh, unprocessed job" thejob = { "jobid": job_id, "command": "some_test_command", "job_type": "testjob", "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "extra_meta": { "filename": "/path/to/some/file.tsv", "percent": 0, "status": "pending"}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: jobs.initialise_job(rconn, **thejob) yield rconn cleanup_job(rconn, job_id, thejob) @pytest.fixture(scope="function") def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name] "redis connection with partially processed job, with no errors" thejob = { "jobid": job_id, "command": "some_test_command", "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", "extra_meta": { "status": "Processing", "filename": "/path/to/some/file.tsv", "percent": 32.242342}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: jobs.initialise_job(rconn, **thejob) yield rconn cleanup_job(rconn, job_id, thejob) @pytest.fixture(scope="function") def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name] "redis connection with partially processed job, with some errors" the_job = { "jobid": job_id, "command": "some_test_command", "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", "extra_meta": { "status": "Processing", "filename": "/path/to/some/file.tsv", "percent": 45.34245, "errors": jsonpickle.encode(( DuplicateHeading( "__test__", 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"), InvalidValue("__test__", 45, 2, "ohMy", "Invalid value 'ohMy'")))} } with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: jobs.initialise_job(rconn, **the_job) yield rconn cleanup_job(rconn, job_id, the_job) @pytest.fixture(scope="function") def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name] "redis connection with completely processed job, with no errors" the_job = { "jobid": job_id, "command": ["complete", "--woerror", "test-command"], "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", "extra_meta": { "status": "success", "filename": "/path/to/some/file.tsv", "percent": 100, "errors": jsonpickle.encode(tuple())}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: jobs.initialise_job(rconn, **the_job) yield rconn cleanup_job(rconn, job_id, the_job) @pytest.fixture(scope="function") def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name] "redis connection with completely processed job, with some errors" the_job = { "jobid": job_id, "command": ["complete", "--werror", "test-command"], "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob", "extra_meta": { "status": "success", "filename": "/path/to/some/file.tsv", "percent": 100, "errors": jsonpickle.encode(( DuplicateHeading( "__test__", 1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"), InvalidValue("__test__", 45, 2, "ohMy", "Invalid value 'ohMy'")))}} with redis.Redis.from_url(redis_url, decode_responses=True) as rconn: jobs.initialise_job(rconn, **the_job) yield rconn cleanup_job(rconn, job_id, the_job) @pytest.fixture(scope="function") def uploads_dir(client): # pylint: disable=[redefined-outer-name] """Returns the configured, uploads directory, creating it if it does not exist.""" the_dir = client.application.config["UPLOAD_FOLDER"] if not os.path.exists(the_dir): os.mkdir(the_dir) return the_dir @pytest.fixture(scope="function") def jobs_errors_dir(uploads_dir): # pylint: disable=[redefined-outer-name] """Returns the configured, jobs errors directory, creating it if it does not exist.""" the_dir = f"{uploads_dir}/job_errors" if not os.path.exists(the_dir): os.mkdir(the_dir) return the_dir @pytest.fixture(scope="function") def stderr_with_output(jobs_errors_dir, job_id): # pylint: disable=[redefined-outer-name] """Creates a sample worker error file with some content""" filepath = f"{jobs_errors_dir}/job_{job_id}.error" with open(filepath, "w", encoding="utf8") as error_file: error_file.write("This is an non-empty error file.") error_file.flush() yield filepath os.remove(filepath) @pytest.fixture(scope="function") def stderr_with_no_output(jobs_errors_dir, job_id): # pylint: disable=[redefined-outer-name] """Creates a sample worker error file with no content""" filepath = f"{jobs_errors_dir}/job_{job_id}.error" with open(filepath, "w", encoding="utf-8") as error_file: error_file.flush() yield filepath os.remove(filepath)