about summary refs log tree commit diff
path: root/gn3
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-16 12:59:27 +0300
committerFrederick Muriuki Muriithi2022-05-16 12:59:27 +0300
commit0b161341083fdaad9bd187ea74bf4e8b9631eef4 (patch)
tree76580a8d32d80c499747b4434ea55afda6c1963d /gn3
parenta657b502e6ed46ea0887b5febb89a7408f163820 (diff)
downloadgenenetwork3-0b161341083fdaad9bd187ea74bf4e8b9631eef4.tar.gz
Run computation in one-shot asynchronous process
After reworking the worker/runner to have a one-shot mode, add a function that
queues up the task and then runs the worker in the one-shot mode to process
the computation in the background.
Diffstat (limited to 'gn3')
-rw-r--r--gn3/api/correlation.py16
-rw-r--r--gn3/commands.py10
2 files changed, 19 insertions, 7 deletions
diff --git a/gn3/api/correlation.py b/gn3/api/correlation.py
index 44aaf56..e362b38 100644
--- a/gn3/api/correlation.py
+++ b/gn3/api/correlation.py
@@ -9,7 +9,7 @@ from flask import request
 from flask import current_app
 
 from gn3.settings import SQL_URI
-from gn3.commands import queue_cmd, compose_pcorrs_command
+from gn3.commands import queue_cmd, run_async_cmd, compose_pcorrs_command
 from gn3.db_utils import database_connector
 from gn3.responses.pcorrs_responses import build_response
 from gn3.computations.correlations import map_shared_keys_to_values
@@ -125,10 +125,9 @@ def partial_correlation():
             "error_type": "Client Error"})
 
     if with_target_db:
-        return build_response({
-            "status": "queued",
-            "results": queue_cmd(
-                conn=redis.Redis(),
+        with redis.Redis() as conn:
+            queueing_results = run_async_cmd(
+                conn=conn,
                 cmd=compose_pcorrs_command(
                     trait_fullname(args["primary_trait"]),
                     tuple(
@@ -136,7 +135,12 @@ def partial_correlation():
                     args["method"], args["target_db"],
                     int(args.get("criteria", 500))),
                 job_queue=current_app.config.get("REDIS_JOB_QUEUE"),
-                env = {"PYTHONPATH": ":".join(sys.path), "SQL_URI": SQL_URI})})
+                env = {"PYTHONPATH": ":".join(sys.path), "SQL_URI": SQL_URI})
+        return build_response({
+            "status": "success",
+            "results": queueing_results,
+            "queued": True
+        })
 
     with database_connector() as conn:
         results = partial_correlations_with_target_traits(
diff --git a/gn3/commands.py b/gn3/commands.py
index e622068..5770902 100644
--- a/gn3/commands.py
+++ b/gn3/commands.py
@@ -93,7 +93,6 @@ Returns the name of the specific redis hash for the specific task.
         conn.hset(name=unique_id, key="env", value=json.dumps(env))
     return unique_id
 
-
 def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
     """Run CMD and return the CMD's status code and output as a dict"""
     parsed_cmd = json.loads(cmd)
@@ -105,3 +104,12 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
     if results.returncode not in success_codes:  # Error!
         out = str(results.stderr, 'utf-8')
     return {"code": results.returncode, "output": out}
+
+def run_async_cmd(
+        conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
+        email: Optional[str] = None, env: Optional[dict] = None) -> str:
+    """A utility function to call `gn3.commands.queue_cmd` function and run the
+    worker in the `one-shot` mode."""
+    cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
+    subprocess.Popen(["python3", "sheepdog/worker.py"])
+    return cmd_id