import os import sys from typing import Callable import jsonpickle from redis import Redis from redis.exceptions import ConnectionError from .qc import cli_argument_parser from quality_control.utils import make_progress_calculator from quality_control.parsing import ( take, FileType, strain_names, collect_errors) def make_progress_indicator( redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable: def __indicator__(linenumber, linetext): progress = progress_calc_fn(linenumber, linetext) redis_connection.hset(name=job_id, mapping=progress._asdict()) return progress return __indicator__ def cli_args_valid(args): if not os.path.exists(args.filepath): print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) return None if not os.path.exists(args.strainsfile): print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) return None try: conn = Redis.from_url(args.redisurl) except ConnectionError as ce: print(traceback.format_exc(), file=sys.stderr) return None return args def process_cli_arguments(): parser = cli_argument_parser() parser.prog = "worker" parser.add_argument( "redisurl", default="redis:///", help="URL to the redis server") parser.add_argument("job_id", help="The id of the job being processed") return cli_args_valid(parser.parse_args()) def main(): args = process_cli_arguments() if args is None: print("Quiting due to errors!", file=sys.stderr) return 1 with Redis.from_url(args.redisurl) as redis_conn: progress_calculator = make_progress_calculator( os.stat(args.filepath).st_size) progress_indicator = make_progress_indicator( redis_conn, args.job_id, progress_calculator) count = args.count filepath = args.filepath filetype = ( FileType.AVERAGE if args.filetype == "average" else FileType.STANDARD_ERROR) strains = strain_names(args.strainsfile) redis_conn.hset( name=args.job_id, key="status", value="Processing") redis_conn.hset( name=args.job_id, key="message", value="Collecting errors") if count > 0: errors = take( collect_errors(filepath, filetype, strains, progress_indicator), count) else: errors = collect_errors(filepath, filetype, strains, progress_indicator) redis_conn.hset( name=args.job_id, key="errors", value=jsonpickle.encode(errors)) redis_conn.hset(name=args.job_id, key="status", value="success") return 0 if __name__ == "__main__": main()