aboutsummaryrefslogtreecommitdiff
path: root/gn3
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-16 12:59:27 +0300
committerFrederick Muriuki Muriithi2022-05-16 12:59:27 +0300
commit0b161341083fdaad9bd187ea74bf4e8b9631eef4 (patch)
tree76580a8d32d80c499747b4434ea55afda6c1963d /gn3
parenta657b502e6ed46ea0887b5febb89a7408f163820 (diff)
downloadgenenetwork3-0b161341083fdaad9bd187ea74bf4e8b9631eef4.tar.gz
Run computation in one-shot asynchronous process
After reworking the worker/runner to have a one-shot mode, add a function that queues up the task and then runs the worker in the one-shot mode to process the computation in the background.
Diffstat (limited to 'gn3')
-rw-r--r--gn3/api/correlation.py16
-rw-r--r--gn3/commands.py10
2 files changed, 19 insertions, 7 deletions
diff --git a/gn3/api/correlation.py b/gn3/api/correlation.py
index 44aaf56..e362b38 100644
--- a/gn3/api/correlation.py
+++ b/gn3/api/correlation.py
@@ -9,7 +9,7 @@ from flask import request
from flask import current_app
from gn3.settings import SQL_URI
-from gn3.commands import queue_cmd, compose_pcorrs_command
+from gn3.commands import queue_cmd, run_async_cmd, compose_pcorrs_command
from gn3.db_utils import database_connector
from gn3.responses.pcorrs_responses import build_response
from gn3.computations.correlations import map_shared_keys_to_values
@@ -125,10 +125,9 @@ def partial_correlation():
"error_type": "Client Error"})
if with_target_db:
- return build_response({
- "status": "queued",
- "results": queue_cmd(
- conn=redis.Redis(),
+ with redis.Redis() as conn:
+ queueing_results = run_async_cmd(
+ conn=conn,
cmd=compose_pcorrs_command(
trait_fullname(args["primary_trait"]),
tuple(
@@ -136,7 +135,12 @@ def partial_correlation():
args["method"], args["target_db"],
int(args.get("criteria", 500))),
job_queue=current_app.config.get("REDIS_JOB_QUEUE"),
- env = {"PYTHONPATH": ":".join(sys.path), "SQL_URI": SQL_URI})})
+ env = {"PYTHONPATH": ":".join(sys.path), "SQL_URI": SQL_URI})
+ return build_response({
+ "status": "success",
+ "results": queueing_results,
+ "queued": True
+ })
with database_connector() as conn:
results = partial_correlations_with_target_traits(
diff --git a/gn3/commands.py b/gn3/commands.py
index e622068..5770902 100644
--- a/gn3/commands.py
+++ b/gn3/commands.py
@@ -93,7 +93,6 @@ Returns the name of the specific redis hash for the specific task.
conn.hset(name=unique_id, key="env", value=json.dumps(env))
return unique_id
-
def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
"""Run CMD and return the CMD's status code and output as a dict"""
parsed_cmd = json.loads(cmd)
@@ -105,3 +104,12 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
if results.returncode not in success_codes: # Error!
out = str(results.stderr, 'utf-8')
return {"code": results.returncode, "output": out}
+
+def run_async_cmd(
+ conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
+ email: Optional[str] = None, env: Optional[dict] = None) -> str:
+ """A utility function to call `gn3.commands.queue_cmd` function and run the
+ worker in the `one-shot` mode."""
+ cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
+ subprocess.Popen(["python3", "sheepdog/worker.py"])
+ return cmd_id