1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
"""module contains code integration correlation implemented in rust here
https://github.com/Alexanderlacuna/correlation_rust
"""
import subprocess
import json
import os
from gn3.computations.qtlreaper import create_output_directory
from gn3.random import random_string
from gn3.settings import CORRELATION_COMMAND
from gn3.settings import TMPDIR
def generate_input_files(dataset: list[str],
output_dir: str = TMPDIR) -> tuple[str, str]:
"""function generates outputfiles and inputfiles"""
tmp_dir = f"{output_dir}/correlation"
create_output_directory(tmp_dir)
tmp_file = os.path.join(tmp_dir, f"{random_string(10)}.txt")
with open(tmp_file, "w", encoding="utf-8") as file_writer:
file_writer.write("\n".join(dataset))
return (tmp_dir, tmp_file)
def generate_json_file(tmp_dir, tmp_file, method, delimiter, x_vals) -> str:
"""generating json input file required by cargo"""
tmp_json_file = os.path.join(tmp_dir, f"{random_string(10)}.json")
output_file = os.path.join(tmp_dir, f"{random_string(10)}.txt")
correlation_args = {
"method": method,
"file_path": tmp_file,
"x_vals": x_vals,
"sample_values": "bxd1",
"output_file": output_file,
"file_delimiter": delimiter
}
with open(tmp_json_file, "w", encoding="utf-8") as outputfile:
json.dump(correlation_args, outputfile)
return (output_file, tmp_json_file)
def run_correlation(dataset, trait_vals:
list[str],
method: str,
delimiter: str):
"""entry function to call rust correlation"""
(tmp_dir, tmp_file) = generate_input_files(dataset)
(output_file, json_file) = generate_json_file(tmp_dir=tmp_dir,
tmp_file=tmp_file,
method=method,
delimiter=delimiter,
x_vals=trait_vals)
command_list = [CORRELATION_COMMAND, json_file, TMPDIR]
subprocess.run(command_list, check=True)
results = parse_correlation_output(output_file, 500)
return results
def parse_correlation_output(result_file: str, top_n: int = 500) -> list[dict]:
"""parse file output """
corr_results = []
with open(result_file, "r", encoding="utf-8") as file_reader:
lines = [next(file_reader) for x in range(top_n)]
for line in lines:
(trait_name, corr_coeff, p_val) = line.rstrip().split(",")
corr_data = {
"num_overlap": 00, # to be later fixed
"corr_coefficient": corr_coeff,
"p_value": p_val
}
corr_results.append({trait_name: corr_data})
return corr_results
def get_samples(all_samples: dict[str, str],
base_samples: list[str],
excluded: list[str]):
"""filter null samples and excluded samples"""
data = {}
if base_samples:
fls = [
sm for sm in base_samples if sm not in excluded]
for sample in fls:
if sample in all_samples:
smp_val = all_samples[sample].strip()
if smp_val.lower() != "x":
data[sample] = float(smp_val)
return data
return({key: float(val) for (key, val) in all_samples.items()
if key not in excluded and val.lower().strip() != "x"})
|