1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
"""Procedures used to work with the various bio-informatics cli
commands"""
import subprocess
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional
from uuid import uuid4
from redis.client import Redis # Used only in type hinting
from gn3.exceptions import RedisConnectionError
from gn3.file_utils import lookup_file
from gn3.file_utils import jsonfile_to_dict
# pylint: disable=locally-disabled, too-many-arguments
def compose_gemma_cmd(
token: str,
metadata_filename: str,
gemma_wrapper_cmd: str = "gemma-wrapper",
gemma_wrapper_kwargs: Optional[Dict] = None,
gemma_kwargs: Optional[Dict] = None,
gemma_args: Optional[List] = None) -> str:
"""Compose a valid GEMMA command given the correct values"""
cmd = f"{gemma_wrapper_cmd} --json"
if gemma_wrapper_kwargs:
cmd += (" " # Add extra space between commands
" ".join([f" --{key} {val}" for key, val
in gemma_wrapper_kwargs.items()]))
data = jsonfile_to_dict(lookup_file("TMPDIR",
token,
metadata_filename))
geno_file = lookup_file(environ_var="TMPDIR",
root_dir="genotype",
file_name=data.get("geno", ""))
pheno_file = lookup_file(environ_var="TMPDIR",
root_dir=token,
file_name=data.get("geno", ""))
cmd += f" -- -g {geno_file} -p {pheno_file}"
if gemma_kwargs:
cmd += (" "
" ".join([f" -{key} {val}"
for key, val in gemma_kwargs.items()]))
if gemma_args:
cmd += (" "
" ".join([f" {arg}" for arg in gemma_args]))
return cmd
def queue_cmd(conn: Redis, cmd: str, email: Optional[str] = None) -> str:
"""Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
it in Redis with an initial status of 'queued'. The following status codes
are supported:
queued: Unprocessed; Still in the queue
running: Still running
success: Successful completion
error: Erroneous completion
"""
if not conn.ping():
raise RedisConnectionError
unique_id = ("cmd::"
f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
f"{str(uuid4())}")
for key, value in {"cmd": cmd,
"result": "",
"status": "queued"}.items():
conn.hset(key, value, unique_id)
conn.rpush("GN2::job-queue",
unique_id)
if email:
conn.hset("email",
email,
unique_id)
return unique_id
def run_cmd(cmd: str) -> Dict:
"""Run CMD and return the CMD's status code and output as a dict"""
results = subprocess.run(cmd, capture_output=True,
shell=True, check=False)
out = str(results.stdout, 'utf-8')
if results.returncode > 0: # Error!
out = str(results.stderr, 'utf-8')
return {"code": results.returncode,
"output": out}
|