From 60e6fe7fbba0f83da5d793d7ab55ff3f873fe42a Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 24 Jan 2024 10:22:09 +0300 Subject: redis-prefix: Update file validation code Update the file validation script and routes to use the redis prefix for jobs. --- scripts/validate_file.py | 57 ++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 29 deletions(-) (limited to 'scripts') diff --git a/scripts/validate_file.py b/scripts/validate_file.py index 4b4fc0c..0028795 100644 --- a/scripts/validate_file.py +++ b/scripts/validate_file.py @@ -12,6 +12,7 @@ from redis.exceptions import ConnectionError # pylint: disable=[redefined-builti from quality_control.utils import make_progress_calculator from quality_control.parsing import FileType, strain_names, collect_errors +from qc_app import jobs from qc_app.db_utils import database_connection from .cli_parser import init_cli_parser @@ -19,12 +20,12 @@ from .qc import add_file_validation_arguments def make_progress_indicator(redis_connection: Redis, - jobid: str, + fqjobid: str, progress_calc_fn: Callable) -> Callable: """Make function that will compute the progress and update redis""" def __indicator__(linenumber, linetext): progress = progress_calc_fn(linenumber, linetext) - redis_connection.hset(name=str(jobid), mapping=progress._asdict()) + redis_connection.hset(name=fqjobid, mapping=progress._asdict()) return progress @@ -57,20 +58,20 @@ def process_cli_arguments(): return cli_args_valid(parser.parse_args()) -def stream_error(redis_conn, jobid, error): +def stream_error(rconn: Redis, fqjobid: str, error): """Update redis with the most current error(s) found""" errors = jsonpickle.decode( - redis_conn.hget(str(jobid), key="errors") or jsonpickle.encode(tuple())) - redis_conn.hset( - str(jobid), key="errors", value=jsonpickle.encode(errors + (error,))) + rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple())) + rconn.hset( + fqjobid, key="errors", value=jsonpickle.encode(errors + (error,))) -def make_user_aborted(redis_conn, jobid): +def make_user_aborted(rconn: Redis, fqjobid: str): """Mkae function that checks whether the user aborted the process""" def __aborted__(): user_aborted = bool(int( - redis_conn.hget(name=str(jobid), key="user_aborted") or "0")) + rconn.hget(name=fqjobid, key="user_aborted") or "0")) if user_aborted: - redis_conn.hset(name=str(jobid), key="status", value="aborted") + rconn.hset(name=fqjobid, key="status", value="aborted") return user_aborted return __aborted__ @@ -87,36 +88,34 @@ def main(): print("Quiting due to errors!", file=sys.stderr) return 1 - with (Redis.from_url(args.redisuri) as redis_conn, + with (Redis.from_url(args.redisuri) as rconn, database_connection(args.databaseuri) as dbconn): + fqjobid = jobs.job_key(args.redisprefix, args.jobid) progress_calculator = make_progress_calculator( get_zipfile_size(args.filepath) if is_zipfile(args.filepath) else os.stat(args.filepath).st_size) progress_indicator = make_progress_indicator( - redis_conn, args.jobid, progress_calculator) - count = args.count - filepath = args.filepath - filetype = ( - FileType.AVERAGE if args.filetype == "average" - else FileType.STANDARD_ERROR) - strains = strain_names(dbconn, args.speciesid) - - redis_conn.hset(name=str(args.jobid), key="status", value="Processing") - redis_conn.hset( - name=str(args.jobid), key="message", value="Collecting errors") + rconn, fqjobid, progress_calculator) - error_count = 0 - for error in collect_errors( - filepath, filetype, strains, progress_indicator, - make_user_aborted(redis_conn, args.jobid)): - stream_error(redis_conn, args.jobid, error) + rconn.hset(fqjobid, key="status", value="Processing") + rconn.hset(fqjobid, key="message", value="Collecting errors") - if count > 0: + error_count = 0 + for error in collect_errors(args.filepath, + (FileType.AVERAGE + if args.filetype == "average" + else FileType.STANDARD_ERROR), + strain_names(dbconn, args.speciesid), + progress_indicator, + make_user_aborted(rconn, fqjobid)): + stream_error(rconn, fqjobid, error) + + if args.count > 0: error_count = error_count + 1 - if error_count >= count: + if error_count >= args.count: break - redis_conn.hset(name=str(args.jobid), key="status", value="success") + rconn.hset(name=fqjobid, key="status", value="success") return 0 -- cgit v1.2.3