about summary refs log tree commit diff
path: root/gn3/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/commands.py')
-rw-r--r--gn3/commands.py62
1 files changed, 52 insertions, 10 deletions
diff --git a/gn3/commands.py b/gn3/commands.py
index 9617663..3852c41 100644
--- a/gn3/commands.py
+++ b/gn3/commands.py
@@ -1,13 +1,16 @@
 """Procedures used to work with the various bio-informatics cli
 commands"""
+import os
 import sys
 import json
+import shlex
 import pickle
 import logging
 import tempfile
 import subprocess
 
 from datetime import datetime
+from typing import Any
 from typing import Dict
 from typing import List
 from typing import Optional
@@ -16,7 +19,7 @@ from typing import Union
 from typing import Sequence
 from uuid import uuid4
 
-from flask import current_app
+from flask import Flask, current_app
 from redis.client import Redis  # Used only in type hinting
 
 from pymonad.either import Either, Left, Right
@@ -25,6 +28,8 @@ from gn3.debug import __pk__
 from gn3.chancy import random_string
 from gn3.exceptions import RedisConnectionError
 
+logger = logging.getLogger(__name__)
+
 
 def compose_gemma_cmd(gemma_wrapper_cmd: str = "gemma-wrapper",
                       gemma_wrapper_kwargs: Optional[Dict] = None,
@@ -44,12 +49,14 @@ def compose_gemma_cmd(gemma_wrapper_cmd: str = "gemma-wrapper",
         cmd += " ".join([f"{arg}" for arg in gemma_args])
     return cmd
 
+
 def compose_rqtl_cmd(rqtl_wrapper_cmd: str,
                      rqtl_wrapper_kwargs: Dict,
                      rqtl_wrapper_bool_kwargs: list) -> str:
     """Compose a valid R/qtl command given the correct input"""
     # Add kwargs with values
-    cmd = f"Rscript { rqtl_wrapper_cmd } " + " ".join(
+    rscript = os.environ.get("RSCRIPT", "Rscript")
+    cmd = f"{rscript} { rqtl_wrapper_cmd } " + " ".join(
         [f"--{key} {val}" for key, val in rqtl_wrapper_kwargs.items()])
 
     # Add boolean kwargs (kwargs that are either on or off, like --interval)
@@ -59,18 +66,22 @@ def compose_rqtl_cmd(rqtl_wrapper_cmd: str,
 
     return cmd
 
+
 def compose_pcorrs_command_for_selected_traits(
         prefix_cmd: Tuple[str, ...], target_traits: Tuple[str, ...]) -> Tuple[
             str, ...]:
     """Build command for partial correlations against selected traits."""
     return prefix_cmd + ("against-traits", ",".join(target_traits))
 
+
 def compose_pcorrs_command_for_database(
         prefix_cmd: Tuple[str, ...], target_database: str,
         criteria: int = 500) -> Tuple[str, ...]:
     """Build command for partial correlations against an entire dataset."""
     return prefix_cmd + (
-        "against-db", f"{target_database}", f"--criteria={criteria}")
+        "against-db", f"{target_database}", "--criteria", str(criteria),
+        "--textdir", current_app.config["TEXTDIR"])
+
 
 def compose_pcorrs_command(
         primary_trait: str, control_traits: Tuple[str, ...], method: str,
@@ -82,7 +93,9 @@ def compose_pcorrs_command(
             return "pearsons"
         if "spearmans" in mthd:
             return "spearmans"
-        raise Exception(f"Invalid method '{method}'")
+        # pylint: disable=[broad-exception-raised]
+        raise Exception(
+            f"Invalid method '{method}'")
 
     prefix_cmd = (
         f"{sys.executable}", "-m", "scripts.partial_correlations",
@@ -96,7 +109,10 @@ def compose_pcorrs_command(
             kwargs.get("target_database") is None
             and kwargs.get("target_traits") is not None):
         return compose_pcorrs_command_for_selected_traits(prefix_cmd, **kwargs)
-    raise Exception("Invalid state: I don't know what command to generate!")
+    # pylint: disable=[broad-exception-raised]
+    raise Exception(
+        "Invalid state: I don't know what command to generate!")
+
 
 def queue_cmd(conn: Redis,
               job_queue: str,
@@ -130,6 +146,7 @@ Returns the name of the specific redis hash for the specific task.
         conn.hset(name=unique_id, key="env", value=json.dumps(env))
     return unique_id
 
+
 def run_sample_corr_cmd(method, this_trait_data, target_dataset_data):
     "Run the sample correlations in an external process, returning the results."
     with tempfile.TemporaryDirectory() as tempdir:
@@ -152,9 +169,14 @@ def run_sample_corr_cmd(method, this_trait_data, target_dataset_data):
 
     return correlation_results
 
+
 def run_cmd(cmd: str, success_codes: Tuple = (0,), env: Optional[str] = None) -> Dict:
     """Run CMD and return the CMD's status code and output as a dict"""
-    parsed_cmd = json.loads(__pk__("Attempting to parse command", cmd))
+    try:
+        parsed_cmd = json.loads(cmd)
+    except json.decoder.JSONDecodeError as _jderr:
+        parsed_cmd = shlex.split(cmd)
+
     parsed_env = (json.loads(env) if env is not None else None)
 
     results = subprocess.run(
@@ -163,17 +185,37 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: Optional[str] = None) ->
     out = str(results.stdout, 'utf-8')
     if results.returncode not in success_codes:  # Error!
         out = str(results.stderr, 'utf-8')
-        (# We do not always run this within an app context
-            current_app.logger.debug if current_app else logging.debug)(out)
+        logger.debug("Command output: %s", out)
     return {"code": results.returncode, "output": out}
 
+
+def compute_job_queue(app: Flask) -> str:
+    """Use the app configurations to compute the job queue"""
+    app_env = app.config["APPLICATION_ENVIRONMENT"]
+    job_queue = app.config["REDIS_JOB_QUEUE"]
+    if bool(app_env):
+        return f"{app_env}::{job_queue}"
+    return job_queue
+
+
 def run_async_cmd(
         conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
-        email: Optional[str] = None, env: Optional[dict] = None) -> str:
+        options: Optional[Dict[str, Any]] = None,
+        log_level: str = "info") -> str:
     """A utility function to call `gn3.commands.queue_cmd` function and run the
     worker in the `one-shot` mode."""
+    email = options.get("email") if options else None
+    env = options.get("env") if options else None
     cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
-    subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with]
+    worker_command = [
+        sys.executable,
+        "-m", "sheepdog.worker",
+        "--queue-name", job_queue,
+        "--log-level", log_level
+    ]
+    logging.debug("Launching the worker: %s", worker_command)
+    subprocess.Popen(  # pylint: disable=[consider-using-with]
+        worker_command)
     return cmd_id