aboutsummaryrefslogtreecommitdiff
"""Set up fixtures for tests"""
import io
import os
import uuid
from hashlib import sha256

import redis
import pytest
import jsonpickle
from redis import Redis

from functional_tools import take

from qc_app import jobs, create_app
from qc_app.jobs import JOBS_PREFIX

from quality_control.errors import InvalidValue, DuplicateHeading

@pytest.fixture(scope="session")
def strains():
    """Parse the strains once every test session"""
    stainnames = set()
    with open("etc/strains.csv", encoding="utf8") as strains_file:
        for idx, line in enumerate(strains_file.readlines()):
            if idx > 0:
                parts = line.split()
                for name in (parts[1], parts[2]):
                    stainnames.add(name.strip())
                    if len(parts) >= 6:
                        alias = parts[5].strip()
                        if alias != "" and alias not in ("P", "\\N"):
                            stainnames.add(alias)

    return tuple(stainnames)

def cleanup_redis(redisuri: str, prefix: str):
    """Delete all keys with given prefix"""
    with Redis.from_url(redisuri, decode_responses=True) as rconn:
        cur = rconn.scan_iter(f"{prefix}:*")
        while True:
            batch = take(cur, 500)
            if len(batch) <= 0:
                break
            rconn.delete(*batch)

@pytest.fixture(scope="module")
def client():
    "Fixture for test client"
    app = create_app(os.environ.get("QCAPP_INSTANCE_PATH"))
    test_prefix = sha256(f"test:{uuid.uuid4()}".encode("utf8")).hexdigest()
    app.config.update({
        "TESTING": True,
        "GNQC_REDIS_PREFIX": f"{test_prefix}:GNQC",
        "JOBS_TTL_SECONDS": 2 * 60 * 60# 2 hours
    })
    with app.app_context():
        yield app.test_client()

    cleanup_redis(app.config["REDIS_URL"], test_prefix)

@pytest.fixture(scope="module")
def db_url(client):#pylint: disable=[redefined-outer-name]
    """Return the database URI"""
    return client.application.config["SQL_URI"]

@pytest.fixture(scope="module")
def redis_url(client):#pylint: disable=[redefined-outer-name]
    """Return the redis URI"""
    return client.application.config["REDIS_URL"]

@pytest.fixture(scope="module")
def redis_prefix(client):#pylint: disable=[redefined-outer-name]
    """Return the redis prefix"""
    return client.application.config["GNQC_REDIS_PREFIX"]

@pytest.fixture(scope="module")
def jobs_prefix(redis_prefix):#pylint: disable=[redefined-outer-name]
    """Return the redis prefix for jobs."""
    return f"{redis_prefix}:{JOBS_PREFIX}"

@pytest.fixture(scope="module")
def redis_ttl(client):#pylint: disable=[redefined-outer-name]
    """Return the redis URI"""
    return client.application.config["JOBS_TTL_SECONDS"]

def uploadable_file_object(filename):
    "Return an 'object' representing the file to be uploaded."
    with open(f"tests/test_data/{filename}", "br") as the_file:
        return (io.BytesIO(the_file.read()), filename)

@pytest.fixture(scope="function")
def job_id():
    "Return a default UUID4 string as the 'job_id' for test purposes"
    return "934c55d8-396e-4959-90e1-2698e9205758"

def cleanup_job(rconn, jobid, thejob):
    """Delete job from redis."""
    rconn.hdel(jobs.job_key(jobs.jobsnamespace(), jobid), *thejob.keys())
    rconn.delete(jobs.job_key(jobs.jobsnamespace(), jobid))

@pytest.fixture(scope="function")
def redis_conn_with_fresh_job(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name]
    "redis connection with fresh, unprocessed job"
    thejob = {
        "jobid": job_id, "command": "some_test_command", "job_type": "testjob",
        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "extra_meta": {
            "filename": "/path/to/some/file.tsv", "percent": 0,
            "status": "pending"}}
    with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
        jobs.initialise_job(rconn, **thejob)
        yield rconn
        cleanup_job(rconn, job_id, thejob)

@pytest.fixture(scope="function")
def redis_conn_with_in_progress_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id):#pylint: disable=[redefined-outer-name]
    "redis connection with partially processed job, with no errors"
    thejob = {
        "jobid": job_id, "command": "some_test_command",
        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
        "extra_meta": {
            "status": "Processing", "filename": "/path/to/some/file.tsv",
            "percent": 32.242342}}
    with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
        jobs.initialise_job(rconn, **thejob)
        yield rconn
        cleanup_job(rconn, job_id, thejob)

@pytest.fixture(scope="function")
def redis_conn_with_in_progress_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
    "redis connection with partially processed job, with some errors"
    the_job = {
        "jobid": job_id, "command": "some_test_command",
        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
        "extra_meta": {
            "status": "Processing", "filename": "/path/to/some/file.tsv",
            "percent": 45.34245, "errors": jsonpickle.encode((
                DuplicateHeading(
                    1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"),
                InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'")))}
    }
    with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
        jobs.initialise_job(rconn, **the_job)
        yield rconn
        cleanup_job(rconn, job_id, the_job)

@pytest.fixture(scope="function")
def redis_conn_with_completed_job_no_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
    "redis connection with completely processed job, with no errors"
    the_job = {
        "jobid": job_id, "command": ["complete", "--woerror", "test-command"],
        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
        "extra_meta": {
            "status": "success", "filename": "/path/to/some/file.tsv",
            "percent": 100, "errors": jsonpickle.encode(tuple())}}
    with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
        jobs.initialise_job(rconn, **the_job)
        yield rconn
        cleanup_job(rconn, job_id, the_job)

@pytest.fixture(scope="function")
def redis_conn_with_completed_job_some_errors(redis_url, redis_ttl, jobs_prefix, job_id): # pylint: disable=[redefined-outer-name]
    "redis connection with completely processed job, with some errors"
    the_job = {
        "jobid": job_id, "command": ["complete", "--werror", "test-command"],
        "ttl_seconds": redis_ttl, "rprefix": jobs_prefix, "job_type": "testjob",
        "extra_meta": {
            "status": "success", "filename": "/path/to/some/file.tsv",
            "percent": 100, "errors": jsonpickle.encode((
                DuplicateHeading(
                    1, (5,13,19), "DupHead", "Heading 'DupHead' is repeated"),
                InvalidValue(45, 2, "ohMy", "Invalid value 'ohMy'")))}}
    with redis.Redis.from_url(redis_url, decode_responses=True) as rconn:
        jobs.initialise_job(rconn, **the_job)
        yield rconn
        cleanup_job(rconn, job_id, the_job)

@pytest.fixture(scope="function")
def uploads_dir(client): # pylint: disable=[redefined-outer-name]
    """Returns the configured, uploads directory, creating it if it does not
    exist."""
    the_dir = client.application.config["UPLOAD_FOLDER"]
    if not os.path.exists(the_dir):
        os.mkdir(the_dir)

    return the_dir

@pytest.fixture(scope="function")
def jobs_errors_dir(uploads_dir): # pylint: disable=[redefined-outer-name]
    """Returns the configured, jobs errors directory, creating it if it does not
    exist."""
    the_dir = f"{uploads_dir}/job_errors"
    if not os.path.exists(the_dir):
        os.mkdir(the_dir)

    return the_dir

@pytest.fixture(scope="function")
def stderr_with_output(jobs_errors_dir, job_id): # pylint: disable=[redefined-outer-name]
    """Creates a sample worker error file with some content"""
    filepath = f"{jobs_errors_dir}/job_{job_id}.error"
    with open(filepath, "w", encoding="utf8") as error_file:
        error_file.write("This is an non-empty error file.")
        error_file.flush()
        yield filepath

    os.remove(filepath)

@pytest.fixture(scope="function")
def stderr_with_no_output(jobs_errors_dir, job_id): # pylint: disable=[redefined-outer-name]
    """Creates a sample worker error file with no content"""
    filepath = f"{jobs_errors_dir}/job_{job_id}.error"
    with open(filepath, "w", encoding="utf-8") as error_file:
        error_file.flush()
        yield filepath

    os.remove(filepath)