diff options
Diffstat (limited to 'scripts/worker.py')
-rw-r--r-- | scripts/worker.py | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/scripts/worker.py b/scripts/worker.py new file mode 100644 index 0000000..f4d5b6b --- /dev/null +++ b/scripts/worker.py @@ -0,0 +1,88 @@ +import os +import sys +from typing import Callable + +import jsonpickle +from redis import Redis +from redis.exceptions import ConnectionError + +from .qc import cli_argument_parser +from quality_control.utils import make_progress_calculator +from quality_control.parsing import ( + take, FileType, strain_names, collect_errors) + + +def make_progress_indicator( + redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable: + def __indicator__(linenumber, linetext): + progress = progress_calc_fn(linenumber, linetext) + redis_connection.hset(name=job_id, mapping=progress._asdict()) + + return progress + + return __indicator__ + +def cli_args_valid(args): + if not os.path.exists(args.filepath): + print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) + return None + + if not os.path.exists(args.strainsfile): + print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) + return None + + try: + conn = Redis.from_url(args.redisurl) + except ConnectionError as ce: + print(traceback.format_exc(), file=sys.stderr) + return None + + return args + +def process_cli_arguments(): + parser = cli_argument_parser() + parser.prog = "worker" + parser.add_argument( + "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument("job_id", help="The id of the job being processed") + + return cli_args_valid(parser.parse_args()) + +def main(): + args = process_cli_arguments() + if args is None: + print("Quiting due to errors!", file=sys.stderr) + return 1 + + with Redis.from_url(args.redisurl) as redis_conn: + progress_calculator = make_progress_calculator( + os.stat(args.filepath).st_size) + progress_indicator = make_progress_indicator( + redis_conn, args.job_id, progress_calculator) + count = args.count + filepath = args.filepath + filetype = ( + FileType.AVERAGE if args.filetype == "average" + else FileType.STANDARD_ERROR) + strains = strain_names(args.strainsfile) + + redis_conn.hset( + name=args.job_id, key="status", value="Processing") + redis_conn.hset( + name=args.job_id, key="message", value="Collecting errors") + + if count > 0: + errors = take( + collect_errors(filepath, filetype, strains, progress_indicator), + count) + else: + errors = collect_errors(filepath, filetype, strains, progress_indicator) + + redis_conn.hset( + name=args.job_id, key="errors", value=jsonpickle.encode(errors)) + redis_conn.hset(name=args.job_id, key="status", value="success") + + return 0 + +if __name__ == "__main__": + main() |