aboutsummaryrefslogtreecommitdiff
path: root/scripts/worker.py
blob: f4d5b6b8526795807a3c472e1e767e6beac1f06d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
import sys
from typing import Callable

import jsonpickle
from redis import Redis
from redis.exceptions import ConnectionError

from .qc import cli_argument_parser
from quality_control.utils import make_progress_calculator
from quality_control.parsing import (
    take, FileType, strain_names, collect_errors)


def make_progress_indicator(
        redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable:
    def __indicator__(linenumber, linetext):
        progress = progress_calc_fn(linenumber, linetext)
        redis_connection.hset(name=job_id, mapping=progress._asdict())

        return progress

    return __indicator__

def cli_args_valid(args):
    if not os.path.exists(args.filepath):
        print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
        return None

    if not os.path.exists(args.strainsfile):
        print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr)
        return None

    try:
        conn = Redis.from_url(args.redisurl)
    except ConnectionError as ce:
        print(traceback.format_exc(), file=sys.stderr)
        return None

    return args

def process_cli_arguments():
    parser = cli_argument_parser()
    parser.prog = "worker"
    parser.add_argument(
        "redisurl", default="redis:///", help="URL to the redis server")
    parser.add_argument("job_id", help="The id of the job being processed")

    return cli_args_valid(parser.parse_args())

def main():
    args = process_cli_arguments()
    if args is None:
        print("Quiting due to errors!", file=sys.stderr)
        return 1

    with Redis.from_url(args.redisurl) as redis_conn:
        progress_calculator = make_progress_calculator(
            os.stat(args.filepath).st_size)
        progress_indicator = make_progress_indicator(
            redis_conn, args.job_id, progress_calculator)
        count = args.count
        filepath = args.filepath
        filetype = (
            FileType.AVERAGE if args.filetype == "average"
            else FileType.STANDARD_ERROR)
        strains = strain_names(args.strainsfile)

        redis_conn.hset(
            name=args.job_id, key="status", value="Processing")
        redis_conn.hset(
            name=args.job_id, key="message", value="Collecting errors")

        if count > 0:
            errors = take(
                collect_errors(filepath, filetype, strains, progress_indicator),
                count)
        else:
            errors = collect_errors(filepath, filetype, strains, progress_indicator)

        redis_conn.hset(
            name=args.job_id, key="errors", value=jsonpickle.encode(errors))
        redis_conn.hset(name=args.job_id, key="status", value="success")

    return 0

if __name__ == "__main__":
    main()