aboutsummaryrefslogtreecommitdiff
path: root/gn3/computations/gemma.py
blob: 0b22d3c87063922beece1d3822e2e7ec51261b48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""Procedures related gemma computations"""
import os

from base64 import b64encode
from hashlib import md5
from typing import Dict
from typing import List
from typing import ValuesView
from gn3.commands import compose_gemma_cmd
from gn3.fs_helpers import get_hash_of_files


def generate_hash_of_string(unhashed_str: str) -> str:
    """Given an UNHASHED_STRING, generate it's md5 hash while removing the '==' at
the end"""
    hashed_str = md5(unhashed_str.encode("utf-8")).digest()
    return b64encode(hashed_str).decode("utf-8").replace("==", "")


def generate_pheno_txt_file(trait_filename: str,
                            values: List,
                            tmpdir: str = "/tmp") -> str:
    """Given VALUES, and TMPDIR, generate a valid traits file"""
    if not os.path.isdir(f"{tmpdir}/gn2/"):
        os.mkdir(f"{tmpdir}/gn2/")
    ext = trait_filename.partition(".")[-1]
    if ext:
        trait_filename = trait_filename.replace(f".{ext}", "")
        ext = f".{ext}"
    trait_filename += f"_{generate_hash_of_string(''.join(values))}{ext}"
    # Early return if this already exists!
    if os.path.isfile(f"{tmpdir}/gn2/{trait_filename}"):
        return f"{tmpdir}/gn2/{trait_filename}"
    with open(f"{tmpdir}/gn2/{trait_filename}", "w") as _file:
        for value in values:
            if value == "x":
                _file.write("NA\n")
            else:
                _file.write(f"{value}\n")
    return f"{tmpdir}/gn2/{trait_filename}"


def do_paths_exist(paths: ValuesView) -> bool:
    """Given a list of PATHS, return False if any of them do not exist."""
    for path in paths:
        if not os.path.isfile(path):
            return False
    return True


# pylint: disable=R0913
def generate_gemma_cmd(gemma_cmd: str,
                       output_dir: str,
                       token: str,
                       gemma_kwargs: Dict,
                       gemma_wrapper_kwargs: Dict = None,
                       chromosomes: str = None) -> Dict:
    """Compute k values"""
    _hash = get_hash_of_files(
        [v for k, v in gemma_kwargs.items() if k in ["g", "p", "a", "c"]])
    if chromosomes:  # Only reached when calculating k-values
        gemma_wrapper_kwargs = {"loco": f"{chromosomes}"}
        _hash += f"-{generate_hash_of_string(chromosomes)[:6]}"
    _output_filename = f"{_hash}-output.json"
    return {
        "output_file":
        _output_filename,
        "gemma_cmd":
        compose_gemma_cmd(gemma_wrapper_cmd=gemma_cmd,
                          gemma_wrapper_kwargs=gemma_wrapper_kwargs,
                          gemma_kwargs=gemma_kwargs,
                          gemma_args=[
                              "-gk", ">",
                              (f"{output_dir}/"
                               f"{token}/{_output_filename}")
                          ])
    }