aboutsummaryrefslogtreecommitdiff
path: root/scripts/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/worker.py')
-rw-r--r--scripts/worker.py88
1 files changed, 88 insertions, 0 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
new file mode 100644
index 0000000..f4d5b6b
--- /dev/null
+++ b/scripts/worker.py
@@ -0,0 +1,88 @@
+import os
+import sys
+from typing import Callable
+
+import jsonpickle
+from redis import Redis
+from redis.exceptions import ConnectionError
+
+from .qc import cli_argument_parser
+from quality_control.utils import make_progress_calculator
+from quality_control.parsing import (
+ take, FileType, strain_names, collect_errors)
+
+
+def make_progress_indicator(
+ redis_connection: Redis, job_id: str, progress_calc_fn: Callable) -> Callable:
+ def __indicator__(linenumber, linetext):
+ progress = progress_calc_fn(linenumber, linetext)
+ redis_connection.hset(name=job_id, mapping=progress._asdict())
+
+ return progress
+
+ return __indicator__
+
+def cli_args_valid(args):
+ if not os.path.exists(args.filepath):
+ print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
+ return None
+
+ if not os.path.exists(args.strainsfile):
+ print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr)
+ return None
+
+ try:
+ conn = Redis.from_url(args.redisurl)
+ except ConnectionError as ce:
+ print(traceback.format_exc(), file=sys.stderr)
+ return None
+
+ return args
+
+def process_cli_arguments():
+ parser = cli_argument_parser()
+ parser.prog = "worker"
+ parser.add_argument(
+ "redisurl", default="redis:///", help="URL to the redis server")
+ parser.add_argument("job_id", help="The id of the job being processed")
+
+ return cli_args_valid(parser.parse_args())
+
+def main():
+ args = process_cli_arguments()
+ if args is None:
+ print("Quiting due to errors!", file=sys.stderr)
+ return 1
+
+ with Redis.from_url(args.redisurl) as redis_conn:
+ progress_calculator = make_progress_calculator(
+ os.stat(args.filepath).st_size)
+ progress_indicator = make_progress_indicator(
+ redis_conn, args.job_id, progress_calculator)
+ count = args.count
+ filepath = args.filepath
+ filetype = (
+ FileType.AVERAGE if args.filetype == "average"
+ else FileType.STANDARD_ERROR)
+ strains = strain_names(args.strainsfile)
+
+ redis_conn.hset(
+ name=args.job_id, key="status", value="Processing")
+ redis_conn.hset(
+ name=args.job_id, key="message", value="Collecting errors")
+
+ if count > 0:
+ errors = take(
+ collect_errors(filepath, filetype, strains, progress_indicator),
+ count)
+ else:
+ errors = collect_errors(filepath, filetype, strains, progress_indicator)
+
+ redis_conn.hset(
+ name=args.job_id, key="errors", value=jsonpickle.encode(errors))
+ redis_conn.hset(name=args.job_id, key="status", value="success")
+
+ return 0
+
+if __name__ == "__main__":
+ main()