aboutsummaryrefslogtreecommitdiff
path: root/gn3/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/commands.py')
-rw-r--r--gn3/commands.py32
1 files changed, 26 insertions, 6 deletions
diff --git a/gn3/commands.py b/gn3/commands.py
index 7d42ced..29e3df2 100644
--- a/gn3/commands.py
+++ b/gn3/commands.py
@@ -1,5 +1,8 @@
"""Procedures used to work with the various bio-informatics cli
commands"""
+import os
+import sys
+import json
import subprocess
from datetime import datetime
@@ -7,6 +10,8 @@ from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
+from typing import Union
+from typing import Sequence
from uuid import uuid4
from redis.client import Redis # Used only in type hinting
@@ -46,10 +51,20 @@ def compose_rqtl_cmd(rqtl_wrapper_cmd: str,
return cmd
+def compose_pcorrs_command(
+ primary_trait: str, control_traits: Tuple[str, ...], method: str,
+ target_database: str, criteria: int = 500):
+ rundir = os.path.abspath(".")
+ return (
+ f"{sys.executable}", f"{rundir}/scripts/partial_correlations.py",
+ primary_trait, ",".join(control_traits), f'"{method}"',
+ f"{target_database}", f"--criteria={criteria}")
+
def queue_cmd(conn: Redis,
job_queue: str,
- cmd: str,
- email: Optional[str] = None) -> str:
+ cmd: Union[str, Sequence[str]],
+ email: Optional[str] = None,
+ env: Optional[dict] = None) -> str:
"""Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
it in Redis with an initial status of 'queued'. The following status codes
are supported:
@@ -68,17 +83,22 @@ Returns the name of the specific redis hash for the specific task.
f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
f"{str(uuid4())}")
conn.rpush(job_queue, unique_id)
- for key, value in {"cmd": cmd, "result": "", "status": "queued"}.items():
+ for key, value in {
+ "cmd": json.dumps(cmd), "result": "", "status": "queued",
+ "env": json.dumps(env)}.items():
conn.hset(name=unique_id, key=key, value=value)
if email:
conn.hset(name=unique_id, key="email", value=email)
return unique_id
-def run_cmd(cmd: str, success_codes: Tuple = (0,)) -> Dict:
+def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
"""Run CMD and return the CMD's status code and output as a dict"""
- results = subprocess.run(cmd, capture_output=True, shell=True,
- check=False)
+ parsed_cmd = json.loads(cmd)
+ parsed_env = (json.loads(env) if env is not None else None)
+ results = subprocess.run(
+ parsed_cmd, capture_output=True, shell=isinstance(parsed_cmd, str),
+ check=False, env=parsed_env)
out = str(results.stdout, 'utf-8')
if results.returncode not in success_codes: # Error!
out = str(results.stderr, 'utf-8')