From 2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 19 May 2022 15:26:15 +0300 Subject: Update Web-UI: use new error collection paradigm - README.org: document how to run scripts manually - manifest.scm: remove python-rq as a dependency - qc_app/jobs.py: rework job launching and processing - qc_app/parse.py: use reworked job processing - qc_app/templates/job_progress.html: display progress correctly - qc_app/templates/parse_results.html: display final results - scripts/worker.py: new worker script --- scripts/worker.py | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 scripts/worker.py (limited to 'scripts') diff --git a/scripts/worker.py b/scripts/worker.py new file mode 100644 index 0000000..f4d5b6b --- /dev/null +++ b/scripts/worker.py @@ -0,0 +1,88 @@ +import os +import sys +from typing import Callable + +import jsonpickle +from redis import Redis +from redis.exceptions import ConnectionError + +from .qc import cli_argument_parser +from quality_control.utils import make_progress_calculator +from quality_control.parsing import ( + take, FileType, strain_names, collect_errors) + + +def make_progress_indicator( + redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable: + def __indicator__(linenumber, linetext): + progress = progress_calc_fn(linenumber, linetext) + redis_connection.hset(name=job_id, mapping=progress._asdict()) + + return progress + + return __indicator__ + +def cli_args_valid(args): + if not os.path.exists(args.filepath): + print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) + return None + + if not os.path.exists(args.strainsfile): + print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) + return None + + try: + conn = Redis.from_url(args.redisurl) + except ConnectionError as ce: + print(traceback.format_exc(), file=sys.stderr) + return None + + return args + +def process_cli_arguments(): + parser = cli_argument_parser() + parser.prog = "worker" + parser.add_argument( + "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument("job_id", help="The id of the job being processed") + + return cli_args_valid(parser.parse_args()) + +def main(): + args = process_cli_arguments() + if args is None: + print("Quiting due to errors!", file=sys.stderr) + return 1 + + with Redis.from_url(args.redisurl) as redis_conn: + progress_calculator = make_progress_calculator( + os.stat(args.filepath).st_size) + progress_indicator = make_progress_indicator( + redis_conn, args.job_id, progress_calculator) + count = args.count + filepath = args.filepath + filetype = ( + FileType.AVERAGE if args.filetype == "average" + else FileType.STANDARD_ERROR) + strains = strain_names(args.strainsfile) + + redis_conn.hset( + name=args.job_id, key="status", value="Processing") + redis_conn.hset( + name=args.job_id, key="message", value="Collecting errors") + + if count > 0: + errors = take( + collect_errors(filepath, filetype, strains, progress_indicator), + count) + else: + errors = collect_errors(filepath, filetype, strains, progress_indicator) + + redis_conn.hset( + name=args.job_id, key="errors", value=jsonpickle.encode(errors)) + redis_conn.hset(name=args.job_id, key="status", value="success") + + return 0 + +if __name__ == "__main__": + main() -- cgit v1.2.3