aboutsummaryrefslogtreecommitdiff
path: root/gn3/commands.py
blob: 14bd295dcf5e546cbbc68849b3ab16f8c8bc3c08 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Procedures used to work with the various bio-informatics cli
commands"""
import subprocess

from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional
from uuid import uuid4
from redis.client import Redis  # Used only in type hinting

from gn3.exceptions import RedisConnectionError


def compose_gemma_cmd(gemma_wrapper_cmd: str = "gemma-wrapper",
                      gemma_wrapper_kwargs: Optional[Dict] = None,
                      gemma_kwargs: Optional[Dict] = None,
                      gemma_args: Optional[List] = None) -> str:
    """Compose a valid GEMMA command given the correct values"""
    cmd = f"{gemma_wrapper_cmd} --json"
    if gemma_wrapper_kwargs:
        cmd += " "  # Add extra space between commands
        cmd += " ".join(
            [f"--{key} {val}" for key, val in gemma_wrapper_kwargs.items()])
    cmd += " -- "
    if gemma_kwargs:
        cmd += " ".join([f"-{key} {val}" for key, val in gemma_kwargs.items()])
    if gemma_args:
        cmd += " "
        cmd += " ".join([f"{arg}" for arg in gemma_args])
    return cmd

def compose_rqtl_cmd(rqtl_wrapper_cmd: str,
                     rqtl_wrapper_kwargs: Dict,
                     rqtl_wrapper_bool_kwargs: list) -> str:
    """Compose a valid R/qtl command given the correct input"""
    # Add kwargs with values
    cmd = f"Rscript { rqtl_wrapper_cmd } " + " ".join(
        [f"--{key} {val}" for key, val in rqtl_wrapper_kwargs.items()])

    # Add boolean kwargs (kwargs that are either on or off, like --interval)
    if rqtl_wrapper_bool_kwargs:
        cmd += " "
        cmd += " ".join([f"--{val}" for val in rqtl_wrapper_bool_kwargs])

    return cmd

def queue_cmd(conn: Redis,
              job_queue: str,
              cmd: str,
              email: Optional[str] = None) -> str:
    """Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
it in Redis with an initial status of 'queued'.  The following status codes
are supported:

    queued:  Unprocessed; Still in the queue
    running: Still running
    success: Successful completion
    error:   Erroneous completion

Returns the name of the specific redis hash for the specific task.

    """
    if not conn.ping():
        raise RedisConnectionError
    unique_id = ("cmd::"
                 f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
                 f"{str(uuid4())}")
    conn.rpush(job_queue, unique_id)
    for key, value in {"cmd": cmd, "result": "", "status": "queued"}.items():
        conn.hset(name=unique_id, key=key, value=value)
    if email:
        conn.hset(name=unique_id, key="email", value=email)
    return unique_id


def run_cmd(cmd: str) -> Dict:
    """Run CMD and return the CMD's status code and output as a dict"""
    results = subprocess.run(cmd, capture_output=True, shell=True, check=False)
    out = str(results.stdout, 'utf-8')
    if results.returncode > 0:  # Error!
        out = str(results.stderr, 'utf-8')
    return {"code": results.returncode, "output": out}