From 772872452dc98b9919a861c017347e88a6536063 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 14 Jul 2022 14:20:51 +0300 Subject: Rename file validation script As preparation for building a new generic worker script, this commit renames the file validation script from 'worker.py' to 'validate_file.py' so as to ensure the name conforms better to what the script does. --- scripts/validate_file.py | 121 +++++++++++++++++++++++++++++++++++++++++++++++ scripts/worker.py | 121 ----------------------------------------------- 2 files changed, 121 insertions(+), 121 deletions(-) create mode 100644 scripts/validate_file.py delete mode 100644 scripts/worker.py (limited to 'scripts') diff --git a/scripts/validate_file.py b/scripts/validate_file.py new file mode 100644 index 0000000..9f0a561 --- /dev/null +++ b/scripts/validate_file.py @@ -0,0 +1,121 @@ +"""External worker script that checks file for correctness""" +import os +import sys +import traceback +from typing import Callable +from zipfile import ZipFile, is_zipfile + +import jsonpickle +from redis import Redis +from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin] + +from quality_control.utils import make_progress_calculator +from quality_control.parsing import FileType, strain_names, collect_errors +from .qc import cli_argument_parser + + +def make_progress_indicator( + redis_connection: Redis, job_id: str, + progress_calc_fn: Callable) -> Callable: + """Make function that will compute the progress and update redis""" + def __indicator__(linenumber, linetext): + progress = progress_calc_fn(linenumber, linetext) + redis_connection.hset(name=job_id, mapping=progress._asdict()) + + return progress + + return __indicator__ + +def cli_args_valid(args): + "Check that the command-line arguments are provided and correct" + if not os.path.exists(args.filepath): + print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) + return None + + if not os.path.exists(args.strainsfile): + print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) + return None + + try: + conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable] + except ConnectionError as conn_err: # pylint: disable=[unused-variable] + print(traceback.format_exc(), file=sys.stderr) + return None + + return args + +def process_cli_arguments(): + """Setup command-line parser""" + parser = cli_argument_parser() + parser.prog = "worker" + parser.add_argument( + "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument("job_id", help="The id of the job being processed") + + return cli_args_valid(parser.parse_args()) + +def stream_error(redis_conn, job_id, error): + """Update redis with the most current error(s) found""" + errors = jsonpickle.decode( + redis_conn.hget(job_id, key="errors") or jsonpickle.encode(tuple())) + redis_conn.hset( + job_id, key="errors", value=jsonpickle.encode(errors + (error,))) + +def make_user_aborted(redis_conn, job_id): + """Mkae function that checks whether the user aborted the process""" + def __aborted__(): + user_aborted = bool(int( + redis_conn.hget(name=job_id, key="user_aborted") or "0")) + if user_aborted: + redis_conn.hset(name=job_id, key="status", value="aborted") + + return user_aborted + return __aborted__ + +def get_zipfile_size(filepath): + "Compute size of given zipfile" + with ZipFile(filepath, "r") as zfile: + return zfile.infolist()[0].file_size + +def main(): + "entry point to the script" + args = process_cli_arguments() + if args is None: + print("Quiting due to errors!", file=sys.stderr) + return 1 + + with Redis.from_url(args.redisurl) as redis_conn: + progress_calculator = make_progress_calculator( + get_zipfile_size(args.filepath) if is_zipfile(args.filepath) + else os.stat(args.filepath).st_size) + progress_indicator = make_progress_indicator( + redis_conn, args.job_id, progress_calculator) + count = args.count + filepath = args.filepath + filetype = ( + FileType.AVERAGE if args.filetype == "average" + else FileType.STANDARD_ERROR) + strains = strain_names(args.strainsfile) + + redis_conn.hset( + name=args.job_id, key="status", value="Processing") + redis_conn.hset( + name=args.job_id, key="message", value="Collecting errors") + + error_count = 0 + for error in collect_errors( + filepath, filetype, strains, progress_indicator, + make_user_aborted(redis_conn, args.job_id)): + stream_error(redis_conn, args.job_id, error) + + if count > 0: + error_count = error_count + 1 + if error_count >= count: + break + + redis_conn.hset(name=args.job_id, key="status", value="success") + + return 0 + +if __name__ == "__main__": + main() diff --git a/scripts/worker.py b/scripts/worker.py deleted file mode 100644 index c6e989f..0000000 --- a/scripts/worker.py +++ /dev/null @@ -1,121 +0,0 @@ -"""External worker script""" -import os -import sys -import traceback -from typing import Callable -from zipfile import ZipFile, is_zipfile - -import jsonpickle -from redis import Redis -from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin] - -from quality_control.utils import make_progress_calculator -from quality_control.parsing import FileType, strain_names, collect_errors -from .qc import cli_argument_parser - - -def make_progress_indicator( - redis_connection: Redis, job_id: str, - progress_calc_fn: Callable) -> Callable: - """Make function that will compute the progress and update redis""" - def __indicator__(linenumber, linetext): - progress = progress_calc_fn(linenumber, linetext) - redis_connection.hset(name=job_id, mapping=progress._asdict()) - - return progress - - return __indicator__ - -def cli_args_valid(args): - "Check that the command-line arguments are provided and correct" - if not os.path.exists(args.filepath): - print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) - return None - - if not os.path.exists(args.strainsfile): - print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) - return None - - try: - conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable] - except ConnectionError as conn_err: # pylint: disable=[unused-variable] - print(traceback.format_exc(), file=sys.stderr) - return None - - return args - -def process_cli_arguments(): - """Setup command-line parser""" - parser = cli_argument_parser() - parser.prog = "worker" - parser.add_argument( - "redisurl", default="redis:///", help="URL to the redis server") - parser.add_argument("job_id", help="The id of the job being processed") - - return cli_args_valid(parser.parse_args()) - -def stream_error(redis_conn, job_id, error): - """Update redis with the most current error(s) found""" - errors = jsonpickle.decode( - redis_conn.hget(job_id, key="errors") or jsonpickle.encode(tuple())) - redis_conn.hset( - job_id, key="errors", value=jsonpickle.encode(errors + (error,))) - -def make_user_aborted(redis_conn, job_id): - """Mkae function that checks whether the user aborted the process""" - def __aborted__(): - user_aborted = bool(int( - redis_conn.hget(name=job_id, key="user_aborted") or "0")) - if user_aborted: - redis_conn.hset(name=job_id, key="status", value="aborted") - - return user_aborted - return __aborted__ - -def get_zipfile_size(filepath): - "Compute size of given zipfile" - with ZipFile(filepath, "r") as zfile: - return zfile.infolist()[0].file_size - -def main(): - "entry point to the script" - args = process_cli_arguments() - if args is None: - print("Quiting due to errors!", file=sys.stderr) - return 1 - - with Redis.from_url(args.redisurl) as redis_conn: - progress_calculator = make_progress_calculator( - get_zipfile_size(args.filepath) if is_zipfile(args.filepath) - else os.stat(args.filepath).st_size) - progress_indicator = make_progress_indicator( - redis_conn, args.job_id, progress_calculator) - count = args.count - filepath = args.filepath - filetype = ( - FileType.AVERAGE if args.filetype == "average" - else FileType.STANDARD_ERROR) - strains = strain_names(args.strainsfile) - - redis_conn.hset( - name=args.job_id, key="status", value="Processing") - redis_conn.hset( - name=args.job_id, key="message", value="Collecting errors") - - error_count = 0 - for error in collect_errors( - filepath, filetype, strains, progress_indicator, - make_user_aborted(redis_conn, args.job_id)): - stream_error(redis_conn, args.job_id, error) - - if count > 0: - error_count = error_count + 1 - if error_count >= count: - break - - redis_conn.hset(name=args.job_id, key="status", value="success") - - return 0 - -if __name__ == "__main__": - main() -- cgit v1.2.3