aboutsummaryrefslogtreecommitdiff
path: root/scripts/validate_file.py
blob: 52e55ecb0bd5bf4c08e31f5c37e2d11a66e78ef9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""External worker script that checks file for correctness"""
import os
import sys
import traceback
from typing import Callable
from zipfile import ZipFile, is_zipfile

import jsonpickle
from redis import Redis
from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin]
from gn_libs.mysqldb import database_connection

from quality_control.utils import make_progress_calculator
from quality_control.parsing import FileType, strain_names, collect_errors

from uploader import jobs

from .cli_parser import init_cli_parser
from .qc import add_file_validation_arguments


def make_progress_indicator(redis_connection: Redis,
                            fqjobid: str,
                            progress_calc_fn: Callable) -> Callable:
    """Make function that will compute the progress and update redis"""
    def __indicator__(linenumber, linetext):
        progress = progress_calc_fn(linenumber, linetext)
        redis_connection.hset(name=fqjobid, mapping=progress._asdict())

        return progress

    return __indicator__

def cli_args_valid(args):
    "Check that the command-line arguments are provided and correct"
    if not os.path.exists(args.filepath):
        print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
        return None

    try:
        _conn = Redis.from_url(args.redisuri)
    except ConnectionError as _conn_err:
        print(traceback.format_exc(), file=sys.stderr)
        return None

    return args

def process_cli_arguments():
    """Setup command-line parser"""
    parser = init_cli_parser(
        "validate-file",
        ("Verify that the file with the expression data conforms to "
         "expectations."))
    parser.add_argument("speciesid",
                        type=int,
                        help="Species for which the data is to be processed.")
    parser = add_file_validation_arguments(parser)

    return cli_args_valid(parser.parse_args())

def stream_error(rconn: Redis, fqjobid: str, error):
    """Update redis with the most current error(s) found"""
    errors = jsonpickle.decode(
        rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple()))
    rconn.hset(
        fqjobid, key="errors", value=jsonpickle.encode(errors + (error,)))

def make_user_aborted(rconn: Redis, fqjobid: str):
    """Mkae function that checks whether the user aborted the process"""
    def __aborted__():
        user_aborted = bool(int(
            rconn.hget(name=fqjobid, key="user_aborted") or "0"))
        if user_aborted:
            rconn.hset(name=fqjobid, key="status", value="aborted")

        return user_aborted
    return __aborted__

def get_zipfile_size(filepath):
    "Compute size of given zipfile"
    with ZipFile(filepath, "r") as zfile:
        return zfile.infolist()[0].file_size

def main():
    "entry point to the script"
    args = process_cli_arguments()
    if args is None:
        print("Quiting due to errors!", file=sys.stderr)
        return 1

    with (Redis.from_url(args.redisuri) as rconn,
          database_connection(args.databaseuri) as dbconn):
        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
        progress_calculator = make_progress_calculator(
            get_zipfile_size(args.filepath) if is_zipfile(args.filepath)
            else os.stat(args.filepath).st_size)
        progress_indicator = make_progress_indicator(
            rconn, fqjobid, progress_calculator)

        rconn.hset(fqjobid, key="status", value="Processing")
        rconn.hset(fqjobid, key="message", value="Collecting errors")

        error_count = 0
        for error in collect_errors(args.filepath,
                                    (FileType.AVERAGE
                                     if args.filetype == "average"
                                     else FileType.STANDARD_ERROR),
                                    strain_names(dbconn, args.speciesid),
                                    progress_indicator,
                                    make_user_aborted(rconn, fqjobid)):
            stream_error(rconn, fqjobid, error)

            if args.count > 0:
                error_count = error_count + 1
                if error_count >= args.count:
                    break

        rconn.hset(name=fqjobid, key="status", value="success")

    return 0

if __name__ == "__main__":
    main()