From 0b161341083fdaad9bd187ea74bf4e8b9631eef4 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 16 May 2022 12:59:27 +0300 Subject: Run computation in one-shot asynchronous process After reworking the worker/runner to have a one-shot mode, add a function that queues up the task and then runs the worker in the one-shot mode to process the computation in the background. --- gn3/api/correlation.py | 16 ++++++++++------ gn3/commands.py | 10 +++++++++- 2 files changed, 19 insertions(+), 7 deletions(-) (limited to 'gn3') diff --git a/gn3/api/correlation.py b/gn3/api/correlation.py index 44aaf56..e362b38 100644 --- a/gn3/api/correlation.py +++ b/gn3/api/correlation.py @@ -9,7 +9,7 @@ from flask import request from flask import current_app from gn3.settings import SQL_URI -from gn3.commands import queue_cmd, compose_pcorrs_command +from gn3.commands import queue_cmd, run_async_cmd, compose_pcorrs_command from gn3.db_utils import database_connector from gn3.responses.pcorrs_responses import build_response from gn3.computations.correlations import map_shared_keys_to_values @@ -125,10 +125,9 @@ def partial_correlation(): "error_type": "Client Error"}) if with_target_db: - return build_response({ - "status": "queued", - "results": queue_cmd( - conn=redis.Redis(), + with redis.Redis() as conn: + queueing_results = run_async_cmd( + conn=conn, cmd=compose_pcorrs_command( trait_fullname(args["primary_trait"]), tuple( @@ -136,7 +135,12 @@ def partial_correlation(): args["method"], args["target_db"], int(args.get("criteria", 500))), job_queue=current_app.config.get("REDIS_JOB_QUEUE"), - env = {"PYTHONPATH": ":".join(sys.path), "SQL_URI": SQL_URI})}) + env = {"PYTHONPATH": ":".join(sys.path), "SQL_URI": SQL_URI}) + return build_response({ + "status": "success", + "results": queueing_results, + "queued": True + }) with database_connector() as conn: results = partial_correlations_with_target_traits( diff --git a/gn3/commands.py b/gn3/commands.py index e622068..5770902 100644 --- a/gn3/commands.py +++ b/gn3/commands.py @@ -93,7 +93,6 @@ Returns the name of the specific redis hash for the specific task. conn.hset(name=unique_id, key="env", value=json.dumps(env)) return unique_id - def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict: """Run CMD and return the CMD's status code and output as a dict""" parsed_cmd = json.loads(cmd) @@ -105,3 +104,12 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict: if results.returncode not in success_codes: # Error! out = str(results.stderr, 'utf-8') return {"code": results.returncode, "output": out} + +def run_async_cmd( + conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]], + email: Optional[str] = None, env: Optional[dict] = None) -> str: + """A utility function to call `gn3.commands.queue_cmd` function and run the + worker in the `one-shot` mode.""" + cmd_id = queue_cmd(conn, job_queue, cmd, email, env) + subprocess.Popen(["python3", "sheepdog/worker.py"]) + return cmd_id -- cgit v1.2.3