aboutsummaryrefslogtreecommitdiff
path: root/scripts/validate_file.py
blob: 9f0a5610b0d2215060dcf1323f6c3f64695d90b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""External worker script that checks file for correctness"""
import os
import sys
import traceback
from typing import Callable
from zipfile import ZipFile, is_zipfile

import jsonpickle
from redis import Redis
from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin]

from quality_control.utils import make_progress_calculator
from quality_control.parsing import FileType, strain_names, collect_errors
from .qc import cli_argument_parser


def make_progress_indicator(
        redis_connection: Redis, job_id: str,
        progress_calc_fn: Callable) -> Callable:
    """Make function that will compute the progress and update redis"""
    def __indicator__(linenumber, linetext):
        progress = progress_calc_fn(linenumber, linetext)
        redis_connection.hset(name=job_id, mapping=progress._asdict())

        return progress

    return __indicator__

def cli_args_valid(args):
    "Check that the command-line arguments are provided and correct"
    if not os.path.exists(args.filepath):
        print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
        return None

    if not os.path.exists(args.strainsfile):
        print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr)
        return None

    try:
        conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable]
    except ConnectionError as conn_err: # pylint: disable=[unused-variable]
        print(traceback.format_exc(), file=sys.stderr)
        return None

    return args

def process_cli_arguments():
    """Setup command-line parser"""
    parser = cli_argument_parser()
    parser.prog = "worker"
    parser.add_argument(
        "redisurl", default="redis:///", help="URL to the redis server")
    parser.add_argument("job_id", help="The id of the job being processed")

    return cli_args_valid(parser.parse_args())

def stream_error(redis_conn, job_id, error):
    """Update redis with the most current error(s) found"""
    errors = jsonpickle.decode(
        redis_conn.hget(job_id, key="errors") or jsonpickle.encode(tuple()))
    redis_conn.hset(
        job_id, key="errors", value=jsonpickle.encode(errors + (error,)))

def make_user_aborted(redis_conn, job_id):
    """Mkae function that checks whether the user aborted the process"""
    def __aborted__():
        user_aborted = bool(int(
            redis_conn.hget(name=job_id, key="user_aborted") or "0"))
        if user_aborted:
            redis_conn.hset(name=job_id, key="status", value="aborted")

        return user_aborted
    return __aborted__

def get_zipfile_size(filepath):
    "Compute size of given zipfile"
    with ZipFile(filepath, "r") as zfile:
        return zfile.infolist()[0].file_size

def main():
    "entry point to the script"
    args = process_cli_arguments()
    if args is None:
        print("Quiting due to errors!", file=sys.stderr)
        return 1

    with Redis.from_url(args.redisurl) as redis_conn:
        progress_calculator = make_progress_calculator(
            get_zipfile_size(args.filepath) if is_zipfile(args.filepath)
            else os.stat(args.filepath).st_size)
        progress_indicator = make_progress_indicator(
            redis_conn, args.job_id, progress_calculator)
        count = args.count
        filepath = args.filepath
        filetype = (
            FileType.AVERAGE if args.filetype == "average"
            else FileType.STANDARD_ERROR)
        strains = strain_names(args.strainsfile)

        redis_conn.hset(
            name=args.job_id, key="status", value="Processing")
        redis_conn.hset(
            name=args.job_id, key="message", value="Collecting errors")

        error_count = 0
        for error in collect_errors(
                filepath, filetype, strains, progress_indicator,
                make_user_aborted(redis_conn, args.job_id)):
            stream_error(redis_conn, args.job_id, error)

            if count > 0:
                error_count = error_count + 1
                if error_count >= count:
                    break

        redis_conn.hset(name=args.job_id, key="status", value="success")

    return 0

if __name__ == "__main__":
    main()