aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/commands.py
blob: cf7904138ae6d581a7a09ea1511012f278393234 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""Procedures used to work with the various bio-informatics cli
commands"""
import sys
import json
import subprocess
from uuid import uuid4
from datetime import datetime
from typing import Dict, Optional, Tuple, Union, Sequence


from redis.client import Redis

from gn_auth.auth.db.redis import RedisConnectionError

def queue_cmd(conn: Redis,
              job_queue: str,
              cmd: Union[str, Sequence[str]],
              email: Optional[str] = None,
              env: Optional[dict] = None) -> str:
    """Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
it in Redis with an initial status of 'queued'.  The following status codes
are supported:

    queued:  Unprocessed; Still in the queue
    running: Still running
    success: Successful completion
    error:   Erroneous completion

Returns the name of the specific redis hash for the specific task.

    """
    if not conn.ping():
        raise RedisConnectionError
    unique_id = ("cmd::"
                 f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
                 f"{str(uuid4())}")
    conn.rpush(job_queue, unique_id)
    for key, value in {
            "cmd": json.dumps(cmd), "result": "", "status": "queued"}.items():
        conn.hset(name=unique_id, key=key, value=value)
    if email:
        conn.hset(name=unique_id, key="email", value=email)
    if env:
        conn.hset(name=unique_id, key="env", value=json.dumps(env))
    return unique_id

def run_cmd(cmd: str,
            success_codes: Tuple = (0,),
            env: Optional[str] = None) -> Dict:
    """Run CMD and return the CMD's status code and output as a dict"""
    parsed_cmd = json.loads(cmd)
    parsed_env = (json.loads(env) if env is not None else None)
    results = subprocess.run(
        parsed_cmd, capture_output=True, shell=isinstance(parsed_cmd, str),
        check=False, env=parsed_env)
    out = str(results.stdout, 'utf-8')
    if results.returncode not in success_codes:  # Error!
        out = str(results.stderr, 'utf-8')
    return {"code": results.returncode, "output": out}

def run_async_cmd(
        conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
        email: Optional[str] = None, env: Optional[dict] = None) -> str:
    """A utility function to call `gn3.commands.queue_cmd` function and run the
    worker in the `one-shot` mode."""
    cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
    subprocess.Popen([f"{sys.executable}", "-m", "scripts.worker", job_queue]) # pylint: disable=[consider-using-with]
    return cmd_id