about summary refs log tree commit diff
path: root/sheepdog/worker.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-02-23 14:51:47 +0300
committerFrederick Muriuki Muriithi2022-03-03 10:20:04 +0300
commit6d39c92fbc9a7b82cd8eef60c62cd5d83acb49a1 (patch)
tree7efab53cc8fc367f433ac01ece95b0fbecc858d9 /sheepdog/worker.py
parent8e0fcfa78fcdb5bdd5b49e2b1ac918ae9cc0fc53 (diff)
downloadgenenetwork3-6d39c92fbc9a7b82cd8eef60c62cd5d83acb49a1.tar.gz
Run partial correlations in an external process
Run the partial correlations code in an external python process decoupling it
from the server and making it asynchronous.

Summary of changes:
* gn3/api/correlation.py:
  - Remove response processing code
  - Queue partial corrs processing
  - Create new endpoint to get results
* gn3/commands.py
  - Compose the pcorrs command to be run in an external process
  - Enable running of subprocess commands with list args
* gn3/responses/__init__.py: new module indicator file
* gn3/responses/pcorrs_responses.py: Hold response processing code extracted
  from ~gn3.api.correlations.py~ file
* scripts/partial_correlations.py: CLI script to process the pcorrs
* sheepdog/worker.py:
  - Add the *genenetwork3* path at the beginning of the ~sys.path~ list to
    override any GN3 in the site-packages
  - Add any environment variables to be set for the command to be run
Diffstat (limited to 'sheepdog/worker.py')
-rw-r--r--sheepdog/worker.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py
index 4e3610e..4e7f9e7 100644
--- a/sheepdog/worker.py
+++ b/sheepdog/worker.py
@@ -5,9 +5,12 @@ import time
 import redis
 import redis.connection
 
-# Enable importing from one dir up since gn3 isn't installed as a globally
-sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+# Enable importing from one dir up: put as first to override any other globally
+# accessible GN3
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
 
+def update_status(conn, cmd_id, status):
+    conn.hset(name=f"{cmd_id}", key="status", value=f"{status}")
 
 def run_jobs(conn):
     """Process the redis using a redis connection, CONN"""
@@ -17,13 +20,14 @@ def run_jobs(conn):
     if bool(cmd_id):
         cmd = conn.hget(name=cmd_id, key="cmd")
         if cmd and (conn.hget(cmd_id, "status") == b"queued"):
-            result = run_cmd(cmd.decode("utf-8"))
+            update_status(conn, cmd_id, "running")
+            result = run_cmd(
+                cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env"))
             conn.hset(name=cmd_id, key="result", value=result.get("output"))
             if result.get("code") == 0:  # Success
-                conn.hset(name=cmd_id, key="status", value="success")
+                update_status(conn, cmd_id, "success")
             else:
-                conn.hset(name=cmd_id, key="status", value="error")
-
+                update_status(conn, cmd_id, "error")
 
 if __name__ == "__main__":
     redis_conn = redis.Redis()