"""External worker script that checks file for correctness""" import os import sys import traceback from typing import Callable from zipfile import ZipFile, is_zipfile import jsonpickle from redis import Redis from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin] from gn_libs.mysqldb import database_connection from quality_control.utils import make_progress_calculator from quality_control.parsing import FileType, strain_names, collect_errors from uploader import jobs from .cli_parser import init_cli_parser from .qc import add_file_validation_arguments def make_progress_indicator(redis_connection: Redis, fqjobid: str, progress_calc_fn: Callable) -> Callable: """Make function that will compute the progress and update redis""" def __indicator__(linenumber, linetext): progress = progress_calc_fn(linenumber, linetext) redis_connection.hset(name=fqjobid, mapping=progress._asdict()) return progress return __indicator__ def cli_args_valid(args): "Check that the command-line arguments are provided and correct" if not os.path.exists(args.filepath): print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) return None try: _conn = Redis.from_url(args.redisuri) except ConnectionError as _conn_err: print(traceback.format_exc(), file=sys.stderr) return None return args def process_cli_arguments(): """Setup command-line parser""" parser = init_cli_parser( "validate-file", ("Verify that the file with the expression data conforms to " "expectations.")) parser.add_argument("speciesid", type=int, help="Species for which the data is to be processed.") parser = add_file_validation_arguments(parser) return cli_args_valid(parser.parse_args()) def stream_error(rconn: Redis, fqjobid: str, error): """Update redis with the most current error(s) found""" errors = jsonpickle.decode( rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple())) rconn.hset( fqjobid, key="errors", value=jsonpickle.encode(errors + (error,))) def make_user_aborted(rconn: Redis, fqjobid: str): """Mkae function that checks whether the user aborted the process""" def __aborted__(): user_aborted = bool(int( rconn.hget(name=fqjobid, key="user_aborted") or "0")) if user_aborted: rconn.hset(name=fqjobid, key="status", value="aborted") return user_aborted return __aborted__ def get_zipfile_size(filepath): "Compute size of given zipfile" with ZipFile(filepath, "r") as zfile: return zfile.infolist()[0].file_size def main(): "entry point to the script" args = process_cli_arguments() if args is None: print("Quiting due to errors!", file=sys.stderr) return 1 with (Redis.from_url(args.redisuri) as rconn, database_connection(args.databaseuri) as dbconn): fqjobid = jobs.job_key(args.redisprefix, args.jobid) progress_calculator = make_progress_calculator( get_zipfile_size(args.filepath) if is_zipfile(args.filepath) else os.stat(args.filepath).st_size) progress_indicator = make_progress_indicator( rconn, fqjobid, progress_calculator) rconn.hset(fqjobid, key="status", value="Processing") rconn.hset(fqjobid, key="message", value="Collecting errors") error_count = 0 for error in collect_errors(args.filepath, (FileType.AVERAGE if args.filetype == "average" else FileType.STANDARD_ERROR), strain_names(dbconn, args.speciesid), progress_indicator, make_user_aborted(rconn, fqjobid)): stream_error(rconn, fqjobid, error) if args.count > 0: error_count = error_count + 1 if error_count >= args.count: break rconn.hset(name=fqjobid, key="status", value="success") return 0 if __name__ == "__main__": main()