aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-01-24 10:22:09 +0300
committerFrederick Muriuki Muriithi2024-01-24 10:22:09 +0300
commit60e6fe7fbba0f83da5d793d7ab55ff3f873fe42a (patch)
tree2cf672f021afc33d775e83da219e5854aae9728b /scripts
parent7976230ffcb1de4f744895ee252298dea9a15f4c (diff)
downloadgn-uploader-60e6fe7fbba0f83da5d793d7ab55ff3f873fe42a.tar.gz
redis-prefix: Update file validation code
Update the file validation script and routes to use the redis prefix for jobs.
Diffstat (limited to 'scripts')
-rw-r--r--scripts/validate_file.py57
1 files changed, 28 insertions, 29 deletions
diff --git a/scripts/validate_file.py b/scripts/validate_file.py
index 4b4fc0c..0028795 100644
--- a/scripts/validate_file.py
+++ b/scripts/validate_file.py
@@ -12,6 +12,7 @@ from redis.exceptions import ConnectionError # pylint: disable=[redefined-builti
from quality_control.utils import make_progress_calculator
from quality_control.parsing import FileType, strain_names, collect_errors
+from qc_app import jobs
from qc_app.db_utils import database_connection
from .cli_parser import init_cli_parser
@@ -19,12 +20,12 @@ from .qc import add_file_validation_arguments
def make_progress_indicator(redis_connection: Redis,
- jobid: str,
+ fqjobid: str,
progress_calc_fn: Callable) -> Callable:
"""Make function that will compute the progress and update redis"""
def __indicator__(linenumber, linetext):
progress = progress_calc_fn(linenumber, linetext)
- redis_connection.hset(name=str(jobid), mapping=progress._asdict())
+ redis_connection.hset(name=fqjobid, mapping=progress._asdict())
return progress
@@ -57,20 +58,20 @@ def process_cli_arguments():
return cli_args_valid(parser.parse_args())
-def stream_error(redis_conn, jobid, error):
+def stream_error(rconn: Redis, fqjobid: str, error):
"""Update redis with the most current error(s) found"""
errors = jsonpickle.decode(
- redis_conn.hget(str(jobid), key="errors") or jsonpickle.encode(tuple()))
- redis_conn.hset(
- str(jobid), key="errors", value=jsonpickle.encode(errors + (error,)))
+ rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple()))
+ rconn.hset(
+ fqjobid, key="errors", value=jsonpickle.encode(errors + (error,)))
-def make_user_aborted(redis_conn, jobid):
+def make_user_aborted(rconn: Redis, fqjobid: str):
"""Mkae function that checks whether the user aborted the process"""
def __aborted__():
user_aborted = bool(int(
- redis_conn.hget(name=str(jobid), key="user_aborted") or "0"))
+ rconn.hget(name=fqjobid, key="user_aborted") or "0"))
if user_aborted:
- redis_conn.hset(name=str(jobid), key="status", value="aborted")
+ rconn.hset(name=fqjobid, key="status", value="aborted")
return user_aborted
return __aborted__
@@ -87,36 +88,34 @@ def main():
print("Quiting due to errors!", file=sys.stderr)
return 1
- with (Redis.from_url(args.redisuri) as redis_conn,
+ with (Redis.from_url(args.redisuri) as rconn,
database_connection(args.databaseuri) as dbconn):
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
progress_calculator = make_progress_calculator(
get_zipfile_size(args.filepath) if is_zipfile(args.filepath)
else os.stat(args.filepath).st_size)
progress_indicator = make_progress_indicator(
- redis_conn, args.jobid, progress_calculator)
- count = args.count
- filepath = args.filepath
- filetype = (
- FileType.AVERAGE if args.filetype == "average"
- else FileType.STANDARD_ERROR)
- strains = strain_names(dbconn, args.speciesid)
-
- redis_conn.hset(name=str(args.jobid), key="status", value="Processing")
- redis_conn.hset(
- name=str(args.jobid), key="message", value="Collecting errors")
+ rconn, fqjobid, progress_calculator)
- error_count = 0
- for error in collect_errors(
- filepath, filetype, strains, progress_indicator,
- make_user_aborted(redis_conn, args.jobid)):
- stream_error(redis_conn, args.jobid, error)
+ rconn.hset(fqjobid, key="status", value="Processing")
+ rconn.hset(fqjobid, key="message", value="Collecting errors")
- if count > 0:
+ error_count = 0
+ for error in collect_errors(args.filepath,
+ (FileType.AVERAGE
+ if args.filetype == "average"
+ else FileType.STANDARD_ERROR),
+ strain_names(dbconn, args.speciesid),
+ progress_indicator,
+ make_user_aborted(rconn, fqjobid)):
+ stream_error(rconn, fqjobid, error)
+
+ if args.count > 0:
error_count = error_count + 1
- if error_count >= count:
+ if error_count >= args.count:
break
- redis_conn.hset(name=str(args.jobid), key="status", value="success")
+ rconn.hset(name=fqjobid, key="status", value="success")
return 0