about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-01-24 10:22:09 +0300
committerFrederick Muriuki Muriithi2024-01-24 10:22:09 +0300
commit60e6fe7fbba0f83da5d793d7ab55ff3f873fe42a (patch)
tree2cf672f021afc33d775e83da219e5854aae9728b
parent7976230ffcb1de4f744895ee252298dea9a15f4c (diff)
downloadgn-uploader-60e6fe7fbba0f83da5d793d7ab55ff3f873fe42a.tar.gz
redis-prefix: Update file validation code
Update the file validation script and routes to use the redis prefix
for jobs.
-rw-r--r--qc_app/jobs.py9
-rw-r--r--qc_app/parse.py12
-rw-r--r--scripts/validate_file.py57
3 files changed, 41 insertions, 37 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index cf4e4ef..1491015 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -62,12 +62,13 @@ def build_file_verification_job(#pylint: disable=[too-many-arguments]
     jobid = str(uuid4())
     command = [
         sys.executable, "-m", "scripts.validate_file",
-        dburi, redisuri, jobid,
+        dburi, redisuri, jobsnamespace(), jobid,
         "--redisexpiry", str(ttl_seconds),
         str(speciesid), filetype, filepath,
     ]
     return initialise_job(
-        redis_conn, jobid, command, "file-verification", ttl_seconds, {
+        redis_conn, jobsnamespace(), jobid, command, "file-verification",
+        ttl_seconds, {
             "filetype": filetype,
             "filename": os.path.basename(filepath), "percent": 0
         })
@@ -77,12 +78,14 @@ def data_insertion_job(# pylint: disable=[too-many-arguments]
         speciesid: int, platformid: int, datasetid: int, databaseuri: str,
         redisuri: str, ttl_seconds: int) -> dict:
     "Build a data insertion job"
+    jobid = str(uuid4())
     command = [
         sys.executable, "-m", "scripts.insert_data", filetype, filepath,
         speciesid, platformid, datasetid, databaseuri, redisuri
     ]
     return initialise_job(
-        redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, {
+        redis_conn, jobsnamespace(), jobid, command, "data-insertion",
+        ttl_seconds, {
             "filename": os.path.basename(filepath), "filetype": filetype,
             "totallines": totallines
         })
diff --git a/qc_app/parse.py b/qc_app/parse.py
index 40f7b44..d9be993 100644
--- a/qc_app/parse.py
+++ b/qc_app/parse.py
@@ -82,7 +82,7 @@ def parse():
 def parse_status(job_id: str):
     "Retrieve the status of the job"
     with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
-        job = jobs.job(rconn, job_id)
+        job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
 
     if job:
         error_filename = jobs.error_filename(
@@ -122,7 +122,7 @@ def parse_status(job_id: str):
 def results(job_id: str):
     """Show results of parsing..."""
     with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
-        job = jobs.job(rconn, job_id)
+        job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
 
     if job:
         filename = job["filename"]
@@ -143,7 +143,7 @@ def results(job_id: str):
 def fail(job_id: str):
     """Handle parsing failure"""
     with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
-        job = jobs.job(rconn, job_id)
+        job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
 
     if job:
         error_filename = jobs.error_filename(
@@ -164,9 +164,11 @@ def abort():
     job_id = request.form["job_id"]
 
     with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
-        job = jobs.job(rconn, job_id)
+        job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
 
         if job:
-            rconn.hset(name=job_id, key="user_aborted", value=int(True))
+            rconn.hset(name=jobs.job_key(jobs.jobsnamespace(), job_id),
+                       key="user_aborted",
+                       value=int(True))
 
     return redirect(url_for("parse.parse_status", job_id=job_id))
diff --git a/scripts/validate_file.py b/scripts/validate_file.py
index 4b4fc0c..0028795 100644
--- a/scripts/validate_file.py
+++ b/scripts/validate_file.py
@@ -12,6 +12,7 @@ from redis.exceptions import ConnectionError # pylint: disable=[redefined-builti
 from quality_control.utils import make_progress_calculator
 from quality_control.parsing import FileType, strain_names, collect_errors
 
+from qc_app import jobs
 from qc_app.db_utils import database_connection
 
 from .cli_parser import init_cli_parser
@@ -19,12 +20,12 @@ from .qc import add_file_validation_arguments
 
 
 def make_progress_indicator(redis_connection: Redis,
-                            jobid: str,
+                            fqjobid: str,
                             progress_calc_fn: Callable) -> Callable:
     """Make function that will compute the progress and update redis"""
     def __indicator__(linenumber, linetext):
         progress = progress_calc_fn(linenumber, linetext)
-        redis_connection.hset(name=str(jobid), mapping=progress._asdict())
+        redis_connection.hset(name=fqjobid, mapping=progress._asdict())
 
         return progress
 
@@ -57,20 +58,20 @@ def process_cli_arguments():
 
     return cli_args_valid(parser.parse_args())
 
-def stream_error(redis_conn, jobid, error):
+def stream_error(rconn: Redis, fqjobid: str, error):
     """Update redis with the most current error(s) found"""
     errors = jsonpickle.decode(
-        redis_conn.hget(str(jobid), key="errors") or jsonpickle.encode(tuple()))
-    redis_conn.hset(
-        str(jobid), key="errors", value=jsonpickle.encode(errors + (error,)))
+        rconn.hget(fqjobid, key="errors") or jsonpickle.encode(tuple()))
+    rconn.hset(
+        fqjobid, key="errors", value=jsonpickle.encode(errors + (error,)))
 
-def make_user_aborted(redis_conn, jobid):
+def make_user_aborted(rconn: Redis, fqjobid: str):
     """Mkae function that checks whether the user aborted the process"""
     def __aborted__():
         user_aborted = bool(int(
-            redis_conn.hget(name=str(jobid), key="user_aborted") or "0"))
+            rconn.hget(name=fqjobid, key="user_aborted") or "0"))
         if user_aborted:
-            redis_conn.hset(name=str(jobid), key="status", value="aborted")
+            rconn.hset(name=fqjobid, key="status", value="aborted")
 
         return user_aborted
     return __aborted__
@@ -87,36 +88,34 @@ def main():
         print("Quiting due to errors!", file=sys.stderr)
         return 1
 
-    with (Redis.from_url(args.redisuri) as redis_conn,
+    with (Redis.from_url(args.redisuri) as rconn,
           database_connection(args.databaseuri) as dbconn):
+        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
         progress_calculator = make_progress_calculator(
             get_zipfile_size(args.filepath) if is_zipfile(args.filepath)
             else os.stat(args.filepath).st_size)
         progress_indicator = make_progress_indicator(
-            redis_conn, args.jobid, progress_calculator)
-        count = args.count
-        filepath = args.filepath
-        filetype = (
-            FileType.AVERAGE if args.filetype == "average"
-            else FileType.STANDARD_ERROR)
-        strains = strain_names(dbconn, args.speciesid)
-
-        redis_conn.hset(name=str(args.jobid), key="status", value="Processing")
-        redis_conn.hset(
-            name=str(args.jobid), key="message", value="Collecting errors")
+            rconn, fqjobid, progress_calculator)
 
-        error_count = 0
-        for error in collect_errors(
-                filepath, filetype, strains, progress_indicator,
-                make_user_aborted(redis_conn, args.jobid)):
-            stream_error(redis_conn, args.jobid, error)
+        rconn.hset(fqjobid, key="status", value="Processing")
+        rconn.hset(fqjobid, key="message", value="Collecting errors")
 
-            if count > 0:
+        error_count = 0
+        for error in collect_errors(args.filepath,
+                                    (FileType.AVERAGE
+                                     if args.filetype == "average"
+                                     else FileType.STANDARD_ERROR),
+                                    strain_names(dbconn, args.speciesid),
+                                    progress_indicator,
+                                    make_user_aborted(rconn, fqjobid)):
+            stream_error(rconn, fqjobid, error)
+
+            if args.count > 0:
                 error_count = error_count + 1
-                if error_count >= count:
+                if error_count >= args.count:
                     break
 
-        redis_conn.hset(name=str(args.jobid), key="status", value="success")
+        rconn.hset(name=fqjobid, key="status", value="success")
 
     return 0