1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""External worker script that checks file for correctness"""
import os
import sys
import traceback
from typing import Callable
from zipfile import ZipFile, is_zipfile
import jsonpickle
from redis import Redis
from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin]
from gn_libs.mysqldb import database_connection
from quality_control.utils import make_progress_calculator
from quality_control.parsing import FileType, strain_names, collect_errors
from uploader import jobs
from .cli_parser import init_cli_parser
from .qc import add_file_validation_arguments
def make_progress_indicator(redis_connection: Redis,
fqjobid: str,
progress_calc_fn: Callable) -> Callable:
"""Make function that will compute the progress and update redis"""
def __indicator__(linenumber, linetext):
progress = progress_calc_fn(linenumber, linetext)
redis_connection.hset(name=fqjobid, mapping=progress._asdict())
return progress
return __indicator__
def cli_args_valid(args):
"Check that the command-line arguments are provided and correct"
if not os.path.exists(args.filepath):
print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
return None
try:
_conn = Redis.from_url(args.redisuri)
except ConnectionError as _conn_err:
print(traceback.format_exc(), file=sys.stderr)
return None
return args
def process_cli_arguments():
"""Setup command-line parser"""
parser = init_cli_parser(
"validate-file",
("Verify that the file with the expression data conforms to "
"expectations."))
parser.add_argument("speciesid",
type=int,
help="Species for which the data is to be processed.")
parser = add_file_validation_arguments(parser)
return cli_args_valid(parser.parse_args())
def stream_error(rconn: Redis, fqjobid: str, error):
"""Update redis with the most current error(s) found"""
errors = jsonpickle.decode(
rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple()))
rconn.hset(
fqjobid, key="errors", value=jsonpickle.encode(errors + (error,)))
def make_user_aborted(rconn: Redis, fqjobid: str):
"""Mkae function that checks whether the user aborted the process"""
def __aborted__():
user_aborted = bool(int(
rconn.hget(name=fqjobid, key="user_aborted") or "0"))
if user_aborted:
rconn.hset(name=fqjobid, key="status", value="aborted")
return user_aborted
return __aborted__
def get_zipfile_size(filepath):
"Compute size of given zipfile"
with ZipFile(filepath, "r") as zfile:
return zfile.infolist()[0].file_size
def main():
"entry point to the script"
args = process_cli_arguments()
if args is None:
print("Quiting due to errors!", file=sys.stderr)
return 1
with (Redis.from_url(args.redisuri) as rconn,
database_connection(args.databaseuri) as dbconn):
fqjobid = jobs.job_key(args.redisprefix, args.jobid)
progress_calculator = make_progress_calculator(
get_zipfile_size(args.filepath) if is_zipfile(args.filepath)
else os.stat(args.filepath).st_size)
progress_indicator = make_progress_indicator(
rconn, fqjobid, progress_calculator)
rconn.hset(fqjobid, key="status", value="Processing")
rconn.hset(fqjobid, key="message", value="Collecting errors")
error_count = 0
for error in collect_errors(args.filepath,
(FileType.AVERAGE
if args.filetype == "average"
else FileType.STANDARD_ERROR),
strain_names(dbconn, args.speciesid),
progress_indicator,
make_user_aborted(rconn, fqjobid)):
stream_error(rconn, fqjobid, error)
if args.count > 0:
error_count = error_count + 1
if error_count >= args.count:
break
rconn.hset(name=fqjobid, key="status", value="success")
return 0
if __name__ == "__main__":
main()
|