diff options
Diffstat (limited to 'gn3/commands.py')
| -rw-r--r-- | gn3/commands.py | 62 |
1 files changed, 52 insertions, 10 deletions
diff --git a/gn3/commands.py b/gn3/commands.py index 9617663..3852c41 100644 --- a/gn3/commands.py +++ b/gn3/commands.py @@ -1,13 +1,16 @@ """Procedures used to work with the various bio-informatics cli commands""" +import os import sys import json +import shlex import pickle import logging import tempfile import subprocess from datetime import datetime +from typing import Any from typing import Dict from typing import List from typing import Optional @@ -16,7 +19,7 @@ from typing import Union from typing import Sequence from uuid import uuid4 -from flask import current_app +from flask import Flask, current_app from redis.client import Redis # Used only in type hinting from pymonad.either import Either, Left, Right @@ -25,6 +28,8 @@ from gn3.debug import __pk__ from gn3.chancy import random_string from gn3.exceptions import RedisConnectionError +logger = logging.getLogger(__name__) + def compose_gemma_cmd(gemma_wrapper_cmd: str = "gemma-wrapper", gemma_wrapper_kwargs: Optional[Dict] = None, @@ -44,12 +49,14 @@ def compose_gemma_cmd(gemma_wrapper_cmd: str = "gemma-wrapper", cmd += " ".join([f"{arg}" for arg in gemma_args]) return cmd + def compose_rqtl_cmd(rqtl_wrapper_cmd: str, rqtl_wrapper_kwargs: Dict, rqtl_wrapper_bool_kwargs: list) -> str: """Compose a valid R/qtl command given the correct input""" # Add kwargs with values - cmd = f"Rscript { rqtl_wrapper_cmd } " + " ".join( + rscript = os.environ.get("RSCRIPT", "Rscript") + cmd = f"{rscript} { rqtl_wrapper_cmd } " + " ".join( [f"--{key} {val}" for key, val in rqtl_wrapper_kwargs.items()]) # Add boolean kwargs (kwargs that are either on or off, like --interval) @@ -59,18 +66,22 @@ def compose_rqtl_cmd(rqtl_wrapper_cmd: str, return cmd + def compose_pcorrs_command_for_selected_traits( prefix_cmd: Tuple[str, ...], target_traits: Tuple[str, ...]) -> Tuple[ str, ...]: """Build command for partial correlations against selected traits.""" return prefix_cmd + ("against-traits", ",".join(target_traits)) + def compose_pcorrs_command_for_database( prefix_cmd: Tuple[str, ...], target_database: str, criteria: int = 500) -> Tuple[str, ...]: """Build command for partial correlations against an entire dataset.""" return prefix_cmd + ( - "against-db", f"{target_database}", f"--criteria={criteria}") + "against-db", f"{target_database}", "--criteria", str(criteria), + "--textdir", current_app.config["TEXTDIR"]) + def compose_pcorrs_command( primary_trait: str, control_traits: Tuple[str, ...], method: str, @@ -82,7 +93,9 @@ def compose_pcorrs_command( return "pearsons" if "spearmans" in mthd: return "spearmans" - raise Exception(f"Invalid method '{method}'") + # pylint: disable=[broad-exception-raised] + raise Exception( + f"Invalid method '{method}'") prefix_cmd = ( f"{sys.executable}", "-m", "scripts.partial_correlations", @@ -96,7 +109,10 @@ def compose_pcorrs_command( kwargs.get("target_database") is None and kwargs.get("target_traits") is not None): return compose_pcorrs_command_for_selected_traits(prefix_cmd, **kwargs) - raise Exception("Invalid state: I don't know what command to generate!") + # pylint: disable=[broad-exception-raised] + raise Exception( + "Invalid state: I don't know what command to generate!") + def queue_cmd(conn: Redis, job_queue: str, @@ -130,6 +146,7 @@ Returns the name of the specific redis hash for the specific task. conn.hset(name=unique_id, key="env", value=json.dumps(env)) return unique_id + def run_sample_corr_cmd(method, this_trait_data, target_dataset_data): "Run the sample correlations in an external process, returning the results." with tempfile.TemporaryDirectory() as tempdir: @@ -152,9 +169,14 @@ def run_sample_corr_cmd(method, this_trait_data, target_dataset_data): return correlation_results + def run_cmd(cmd: str, success_codes: Tuple = (0,), env: Optional[str] = None) -> Dict: """Run CMD and return the CMD's status code and output as a dict""" - parsed_cmd = json.loads(__pk__("Attempting to parse command", cmd)) + try: + parsed_cmd = json.loads(cmd) + except json.decoder.JSONDecodeError as _jderr: + parsed_cmd = shlex.split(cmd) + parsed_env = (json.loads(env) if env is not None else None) results = subprocess.run( @@ -163,17 +185,37 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: Optional[str] = None) -> out = str(results.stdout, 'utf-8') if results.returncode not in success_codes: # Error! out = str(results.stderr, 'utf-8') - (# We do not always run this within an app context - current_app.logger.debug if current_app else logging.debug)(out) + logger.debug("Command output: %s", out) return {"code": results.returncode, "output": out} + +def compute_job_queue(app: Flask) -> str: + """Use the app configurations to compute the job queue""" + app_env = app.config["APPLICATION_ENVIRONMENT"] + job_queue = app.config["REDIS_JOB_QUEUE"] + if bool(app_env): + return f"{app_env}::{job_queue}" + return job_queue + + def run_async_cmd( conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]], - email: Optional[str] = None, env: Optional[dict] = None) -> str: + options: Optional[Dict[str, Any]] = None, + log_level: str = "info") -> str: """A utility function to call `gn3.commands.queue_cmd` function and run the worker in the `one-shot` mode.""" + email = options.get("email") if options else None + env = options.get("env") if options else None cmd_id = queue_cmd(conn, job_queue, cmd, email, env) - subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with] + worker_command = [ + sys.executable, + "-m", "sheepdog.worker", + "--queue-name", job_queue, + "--log-level", log_level + ] + logging.debug("Launching the worker: %s", worker_command) + subprocess.Popen( # pylint: disable=[consider-using-with] + worker_command) return cmd_id |
