about summary refs log tree commit diff
path: root/gn3/commands.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-02-23 14:51:47 +0300
committerFrederick Muriuki Muriithi2022-03-03 10:20:04 +0300
commit6d39c92fbc9a7b82cd8eef60c62cd5d83acb49a1 (patch)
tree7efab53cc8fc367f433ac01ece95b0fbecc858d9 /gn3/commands.py
parent8e0fcfa78fcdb5bdd5b49e2b1ac918ae9cc0fc53 (diff)
downloadgenenetwork3-6d39c92fbc9a7b82cd8eef60c62cd5d83acb49a1.tar.gz
Run partial correlations in an external process
Run the partial correlations code in an external python process decoupling it
from the server and making it asynchronous.

Summary of changes:
* gn3/api/correlation.py:
  - Remove response processing code
  - Queue partial corrs processing
  - Create new endpoint to get results
* gn3/commands.py
  - Compose the pcorrs command to be run in an external process
  - Enable running of subprocess commands with list args
* gn3/responses/__init__.py: new module indicator file
* gn3/responses/pcorrs_responses.py: Hold response processing code extracted
  from ~gn3.api.correlations.py~ file
* scripts/partial_correlations.py: CLI script to process the pcorrs
* sheepdog/worker.py:
  - Add the *genenetwork3* path at the beginning of the ~sys.path~ list to
    override any GN3 in the site-packages
  - Add any environment variables to be set for the command to be run
Diffstat (limited to 'gn3/commands.py')
-rw-r--r--gn3/commands.py32
1 files changed, 26 insertions, 6 deletions
diff --git a/gn3/commands.py b/gn3/commands.py
index 7d42ced..29e3df2 100644
--- a/gn3/commands.py
+++ b/gn3/commands.py
@@ -1,5 +1,8 @@
 """Procedures used to work with the various bio-informatics cli
 commands"""
+import os
+import sys
+import json
 import subprocess
 
 from datetime import datetime
@@ -7,6 +10,8 @@ from typing import Dict
 from typing import List
 from typing import Optional
 from typing import Tuple
+from typing import Union
+from typing import Sequence
 from uuid import uuid4
 from redis.client import Redis  # Used only in type hinting
 
@@ -46,10 +51,20 @@ def compose_rqtl_cmd(rqtl_wrapper_cmd: str,
 
     return cmd
 
+def compose_pcorrs_command(
+        primary_trait: str, control_traits: Tuple[str, ...], method: str,
+        target_database: str, criteria: int = 500):
+    rundir = os.path.abspath(".")
+    return (
+        f"{sys.executable}", f"{rundir}/scripts/partial_correlations.py",
+        primary_trait, ",".join(control_traits), f'"{method}"',
+        f"{target_database}", f"--criteria={criteria}")
+
 def queue_cmd(conn: Redis,
               job_queue: str,
-              cmd: str,
-              email: Optional[str] = None) -> str:
+              cmd: Union[str, Sequence[str]],
+              email: Optional[str] = None,
+              env: Optional[dict] = None) -> str:
     """Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
 it in Redis with an initial status of 'queued'.  The following status codes
 are supported:
@@ -68,17 +83,22 @@ Returns the name of the specific redis hash for the specific task.
                  f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
                  f"{str(uuid4())}")
     conn.rpush(job_queue, unique_id)
-    for key, value in {"cmd": cmd, "result": "", "status": "queued"}.items():
+    for key, value in {
+            "cmd": json.dumps(cmd), "result": "", "status": "queued",
+            "env": json.dumps(env)}.items():
         conn.hset(name=unique_id, key=key, value=value)
     if email:
         conn.hset(name=unique_id, key="email", value=email)
     return unique_id
 
 
-def run_cmd(cmd: str, success_codes: Tuple = (0,)) -> Dict:
+def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
     """Run CMD and return the CMD's status code and output as a dict"""
-    results = subprocess.run(cmd, capture_output=True, shell=True,
-                             check=False)
+    parsed_cmd = json.loads(cmd)
+    parsed_env = (json.loads(env) if env is not None else None)
+    results = subprocess.run(
+        parsed_cmd, capture_output=True, shell=isinstance(parsed_cmd, str),
+        check=False, env=parsed_env)
     out = str(results.stdout, 'utf-8')
     if results.returncode not in success_codes:  # Error!
         out = str(results.stderr, 'utf-8')