From 60e6fe7fbba0f83da5d793d7ab55ff3f873fe42a Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 24 Jan 2024 10:22:09 +0300 Subject: redis-prefix: Update file validation code Update the file validation script and routes to use the redis prefix for jobs. --- qc_app/jobs.py | 9 +++++--- qc_app/parse.py | 12 +++++----- scripts/validate_file.py | 57 ++++++++++++++++++++++++------------------------ 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/qc_app/jobs.py b/qc_app/jobs.py index cf4e4ef..1491015 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -62,12 +62,13 @@ def build_file_verification_job(#pylint: disable=[too-many-arguments] jobid = str(uuid4()) command = [ sys.executable, "-m", "scripts.validate_file", - dburi, redisuri, jobid, + dburi, redisuri, jobsnamespace(), jobid, "--redisexpiry", str(ttl_seconds), str(speciesid), filetype, filepath, ] return initialise_job( - redis_conn, jobid, command, "file-verification", ttl_seconds, { + redis_conn, jobsnamespace(), jobid, command, "file-verification", + ttl_seconds, { "filetype": filetype, "filename": os.path.basename(filepath), "percent": 0 }) @@ -77,12 +78,14 @@ def data_insertion_job(# pylint: disable=[too-many-arguments] speciesid: int, platformid: int, datasetid: int, databaseuri: str, redisuri: str, ttl_seconds: int) -> dict: "Build a data insertion job" + jobid = str(uuid4()) command = [ sys.executable, "-m", "scripts.insert_data", filetype, filepath, speciesid, platformid, datasetid, databaseuri, redisuri ] return initialise_job( - redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, { + redis_conn, jobsnamespace(), jobid, command, "data-insertion", + ttl_seconds, { "filename": os.path.basename(filepath), "filetype": filetype, "totallines": totallines }) diff --git a/qc_app/parse.py b/qc_app/parse.py index 40f7b44..d9be993 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -82,7 +82,7 @@ def parse(): def parse_status(job_id: str): "Retrieve the status of the job" with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - job = jobs.job(rconn, job_id) + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) if job: error_filename = jobs.error_filename( @@ -122,7 +122,7 @@ def parse_status(job_id: str): def results(job_id: str): """Show results of parsing...""" with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - job = jobs.job(rconn, job_id) + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) if job: filename = job["filename"] @@ -143,7 +143,7 @@ def results(job_id: str): def fail(job_id: str): """Handle parsing failure""" with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - job = jobs.job(rconn, job_id) + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) if job: error_filename = jobs.error_filename( @@ -164,9 +164,11 @@ def abort(): job_id = request.form["job_id"] with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - job = jobs.job(rconn, job_id) + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) if job: - rconn.hset(name=job_id, key="user_aborted", value=int(True)) + rconn.hset(name=jobs.job_key(jobs.jobsnamespace(), job_id), + key="user_aborted", + value=int(True)) return redirect(url_for("parse.parse_status", job_id=job_id)) diff --git a/scripts/validate_file.py b/scripts/validate_file.py index 4b4fc0c..0028795 100644 --- a/scripts/validate_file.py +++ b/scripts/validate_file.py @@ -12,6 +12,7 @@ from redis.exceptions import ConnectionError # pylint: disable=[redefined-builti from quality_control.utils import make_progress_calculator from quality_control.parsing import FileType, strain_names, collect_errors +from qc_app import jobs from qc_app.db_utils import database_connection from .cli_parser import init_cli_parser @@ -19,12 +20,12 @@ from .qc import add_file_validation_arguments def make_progress_indicator(redis_connection: Redis, - jobid: str, + fqjobid: str, progress_calc_fn: Callable) -> Callable: """Make function that will compute the progress and update redis""" def __indicator__(linenumber, linetext): progress = progress_calc_fn(linenumber, linetext) - redis_connection.hset(name=str(jobid), mapping=progress._asdict()) + redis_connection.hset(name=fqjobid, mapping=progress._asdict()) return progress @@ -57,20 +58,20 @@ def process_cli_arguments(): return cli_args_valid(parser.parse_args()) -def stream_error(redis_conn, jobid, error): +def stream_error(rconn: Redis, fqjobid: str, error): """Update redis with the most current error(s) found""" errors = jsonpickle.decode( - redis_conn.hget(str(jobid), key="errors") or jsonpickle.encode(tuple())) - redis_conn.hset( - str(jobid), key="errors", value=jsonpickle.encode(errors + (error,))) + rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple())) + rconn.hset( + fqjobid, key="errors", value=jsonpickle.encode(errors + (error,))) -def make_user_aborted(redis_conn, jobid): +def make_user_aborted(rconn: Redis, fqjobid: str): """Mkae function that checks whether the user aborted the process""" def __aborted__(): user_aborted = bool(int( - redis_conn.hget(name=str(jobid), key="user_aborted") or "0")) + rconn.hget(name=fqjobid, key="user_aborted") or "0")) if user_aborted: - redis_conn.hset(name=str(jobid), key="status", value="aborted") + rconn.hset(name=fqjobid, key="status", value="aborted") return user_aborted return __aborted__ @@ -87,36 +88,34 @@ def main(): print("Quiting due to errors!", file=sys.stderr) return 1 - with (Redis.from_url(args.redisuri) as redis_conn, + with (Redis.from_url(args.redisuri) as rconn, database_connection(args.databaseuri) as dbconn): + fqjobid = jobs.job_key(args.redisprefix, args.jobid) progress_calculator = make_progress_calculator( get_zipfile_size(args.filepath) if is_zipfile(args.filepath) else os.stat(args.filepath).st_size) progress_indicator = make_progress_indicator( - redis_conn, args.jobid, progress_calculator) - count = args.count - filepath = args.filepath - filetype = ( - FileType.AVERAGE if args.filetype == "average" - else FileType.STANDARD_ERROR) - strains = strain_names(dbconn, args.speciesid) - - redis_conn.hset(name=str(args.jobid), key="status", value="Processing") - redis_conn.hset( - name=str(args.jobid), key="message", value="Collecting errors") + rconn, fqjobid, progress_calculator) - error_count = 0 - for error in collect_errors( - filepath, filetype, strains, progress_indicator, - make_user_aborted(redis_conn, args.jobid)): - stream_error(redis_conn, args.jobid, error) + rconn.hset(fqjobid, key="status", value="Processing") + rconn.hset(fqjobid, key="message", value="Collecting errors") - if count > 0: + error_count = 0 + for error in collect_errors(args.filepath, + (FileType.AVERAGE + if args.filetype == "average" + else FileType.STANDARD_ERROR), + strain_names(dbconn, args.speciesid), + progress_indicator, + make_user_aborted(rconn, fqjobid)): + stream_error(rconn, fqjobid, error) + + if args.count > 0: error_count = error_count + 1 - if error_count >= count: + if error_count >= args.count: break - redis_conn.hset(name=str(args.jobid), key="status", value="success") + rconn.hset(name=fqjobid, key="status", value="success") return 0 -- cgit v1.2.3