about summary refs log tree commit diff
path: root/scripts/validate_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/validate_file.py')
-rw-r--r--scripts/validate_file.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/scripts/validate_file.py b/scripts/validate_file.py
new file mode 100644
index 0000000..9f0a561
--- /dev/null
+++ b/scripts/validate_file.py
@@ -0,0 +1,121 @@
+"""External worker script that checks file for correctness"""
+import os
+import sys
+import traceback
+from typing import Callable
+from zipfile import ZipFile, is_zipfile
+
+import jsonpickle
+from redis import Redis
+from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin]
+
+from quality_control.utils import make_progress_calculator
+from quality_control.parsing import FileType, strain_names, collect_errors
+from .qc import cli_argument_parser
+
+
+def make_progress_indicator(
+        redis_connection: Redis, job_id: str,
+        progress_calc_fn: Callable) -> Callable:
+    """Make function that will compute the progress and update redis"""
+    def __indicator__(linenumber, linetext):
+        progress = progress_calc_fn(linenumber, linetext)
+        redis_connection.hset(name=job_id, mapping=progress._asdict())
+
+        return progress
+
+    return __indicator__
+
+def cli_args_valid(args):
+    "Check that the command-line arguments are provided and correct"
+    if not os.path.exists(args.filepath):
+        print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
+        return None
+
+    if not os.path.exists(args.strainsfile):
+        print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr)
+        return None
+
+    try:
+        conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable]
+    except ConnectionError as conn_err: # pylint: disable=[unused-variable]
+        print(traceback.format_exc(), file=sys.stderr)
+        return None
+
+    return args
+
+def process_cli_arguments():
+    """Setup command-line parser"""
+    parser = cli_argument_parser()
+    parser.prog = "worker"
+    parser.add_argument(
+        "redisurl", default="redis:///", help="URL to the redis server")
+    parser.add_argument("job_id", help="The id of the job being processed")
+
+    return cli_args_valid(parser.parse_args())
+
+def stream_error(redis_conn, job_id, error):
+    """Update redis with the most current error(s) found"""
+    errors = jsonpickle.decode(
+        redis_conn.hget(job_id, key="errors") or jsonpickle.encode(tuple()))
+    redis_conn.hset(
+        job_id, key="errors", value=jsonpickle.encode(errors + (error,)))
+
+def make_user_aborted(redis_conn, job_id):
+    """Mkae function that checks whether the user aborted the process"""
+    def __aborted__():
+        user_aborted = bool(int(
+            redis_conn.hget(name=job_id, key="user_aborted") or "0"))
+        if user_aborted:
+            redis_conn.hset(name=job_id, key="status", value="aborted")
+
+        return user_aborted
+    return __aborted__
+
+def get_zipfile_size(filepath):
+    "Compute size of given zipfile"
+    with ZipFile(filepath, "r") as zfile:
+        return zfile.infolist()[0].file_size
+
+def main():
+    "entry point to the script"
+    args = process_cli_arguments()
+    if args is None:
+        print("Quiting due to errors!", file=sys.stderr)
+        return 1
+
+    with Redis.from_url(args.redisurl) as redis_conn:
+        progress_calculator = make_progress_calculator(
+            get_zipfile_size(args.filepath) if is_zipfile(args.filepath)
+            else os.stat(args.filepath).st_size)
+        progress_indicator = make_progress_indicator(
+            redis_conn, args.job_id, progress_calculator)
+        count = args.count
+        filepath = args.filepath
+        filetype = (
+            FileType.AVERAGE if args.filetype == "average"
+            else FileType.STANDARD_ERROR)
+        strains = strain_names(args.strainsfile)
+
+        redis_conn.hset(
+            name=args.job_id, key="status", value="Processing")
+        redis_conn.hset(
+            name=args.job_id, key="message", value="Collecting errors")
+
+        error_count = 0
+        for error in collect_errors(
+                filepath, filetype, strains, progress_indicator,
+                make_user_aborted(redis_conn, args.job_id)):
+            stream_error(redis_conn, args.job_id, error)
+
+            if count > 0:
+                error_count = error_count + 1
+                if error_count >= count:
+                    break
+
+        redis_conn.hset(name=args.job_id, key="status", value="success")
+
+    return 0
+
+if __name__ == "__main__":
+    main()