1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
"""Procedures used to work with the various bio-informatics cli
commands"""
import sys
import json
import subprocess
from uuid import uuid4
from datetime import datetime
from typing import Dict, Optional, Tuple, Union, Sequence
from redis.client import Redis
def queue_cmd(conn: Redis,
job_queue: str,
cmd: Union[str, Sequence[str]],
email: Optional[str] = None,
env: Optional[dict] = None) -> str:
"""Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
it in Redis with an initial status of 'queued'. The following status codes
are supported:
queued: Unprocessed; Still in the queue
running: Still running
success: Successful completion
error: Erroneous completion
Returns the name of the specific redis hash for the specific task.
"""
if not conn.ping():
raise RedisConnectionError
unique_id = ("cmd::"
f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
f"{str(uuid4())}")
conn.rpush(job_queue, unique_id)
for key, value in {
"cmd": json.dumps(cmd), "result": "", "status": "queued"}.items():
conn.hset(name=unique_id, key=key, value=value)
if email:
conn.hset(name=unique_id, key="email", value=email)
if env:
conn.hset(name=unique_id, key="env", value=json.dumps(env))
return unique_id
def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
"""Run CMD and return the CMD's status code and output as a dict"""
parsed_cmd = json.loads(cmd)
parsed_env = (json.loads(env) if env is not None else None)
results = subprocess.run(
parsed_cmd, capture_output=True, shell=isinstance(parsed_cmd, str),
check=False, env=parsed_env)
out = str(results.stdout, 'utf-8')
if results.returncode not in success_codes: # Error!
out = str(results.stderr, 'utf-8')
return {"code": results.returncode, "output": out}
def run_async_cmd(
conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
email: Optional[str] = None, env: Optional[dict] = None) -> str:
"""A utility function to call `gn3.commands.queue_cmd` function and run the
worker in the `one-shot` mode."""
cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with]
return cmd_id
|