1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
"""module contains python code for wgcna"""
from unittest import TestCase
from unittest import mock
import pytest
import os.path
from gn3.computations.wgcna import dump_wgcna_data
from gn3.computations.wgcna import compose_rscript_cmd
from gn3.computations.wgcna import call_wgcna_script
from gn3.settings import R_SCRIPTS
class TestWgcna(TestCase):
"""test class for wgcna"""
@pytest.mark.unit_test
@mock.patch("gn3.computations.wgcna.process_image")
@mock.patch("gn3.computations.wgcna.run_cmd")
@mock.patch("gn3.computations.wgcna.compose_rscript_cmd")
@mock.patch("gn3.computations.wgcna.dump_wgcna_data")
def test_call_wgcna_script(self,
mock_dumping_data,
mock_compose_wgcna,
mock_run_cmd,
mock_img,
):
"""test for calling wgcna script"""
# pylint: disable = line-too-long
mock_dumping_data.return_value = "/tmp/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc-test.json"
mock_compose_wgcna.return_value = "Rscript/GUIX_PATH/scripts/r_file.R /tmp/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc-test.json"
request_data = {
"trait_names": ["1455537_at", "1425637_at", "1449593_at", "1421945_a_at", "1450423_s_at", "1423841_at", "1451144_at"],
"trait_sample_data": [
{
"129S1/SvImJ": 7.142,
"A/J": 7.31,
"AKR/J": 7.49,
"B6D2F1": 6.899,
"BALB/cByJ": 7.172,
"BALB/cJ": 7.396
},
{
"129S1/SvImJ": 7.071,
"A/J": 7.05,
"AKR/J": 7.313,
"B6D2F1": 6.999,
"BALB/cByJ": 7.293,
"BALB/cJ": 7.117
}]}
mock_run_cmd_results = {
"code": 0,
"output": "Flagging genes and samples with too many missing values...\n ..step 1\nAllowing parallel execution with up to 3 working processes.\npickSoftThreshold: will use block size 7.\n pickSoftThreshold: calculating connectivity for given powers...\n ..working on genes 1 through 7 of 7\n Flagging genes and samples with too many missing values...\n ..step 1\n ..Working on block 1 .\n TOM calculation: adjacency..\n ..will not use multithreading.\nclustering..\n ....detecting modules..\n ....calculating module eigengenes..\n ....checking kME in modules..\n ..merging modules that are too close..\n mergeCloseModules: Merging modules whose distance is less than 0.15\n mergeCloseModules: less than two proper modules.\n ..color levels are turquoise\n ..there is nothing to merge.\n Calculating new MEs...\n"
}
json_output = "{\"inputdata\":{\"trait_sample_data \":{},\"minModuleSize\":30,\"TOMtype\":\"unsigned\"},\"output\":{\"eigengenes\":[],\"imageLoc\":[],\"colors\":[]}}"
expected_output = {
"data": {
"inputdata": {
"trait_sample_data ": {},
"minModuleSize": 30,
"TOMtype": "unsigned"
},
"output": {
"eigengenes": [],
"imageLoc": [],
"colors": [],
"image_data": "AFDSFNBSDGJJHH"
}
},
**mock_run_cmd_results
}
with mock.patch("builtins.open", mock.mock_open(read_data=json_output)):
mock_run_cmd.return_value = mock_run_cmd_results
mock_img.return_value = b"AFDSFNBSDGJJHH"
results = call_wgcna_script(
"Rscript/GUIX_PATH/scripts/r_file.R", request_data)
mock_dumping_data.assert_called_once_with(request_data)
mock_compose_wgcna.assert_called_once_with(
"Rscript/GUIX_PATH/scripts/r_file.R",
"/tmp/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc-test.json")
mock_run_cmd.assert_called_once_with(
"Rscript/GUIX_PATH/scripts/r_file.R /tmp/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc-test.json")
self.assertEqual(results, expected_output)
@pytest.mark.unit_test
@mock.patch("gn3.computations.wgcna.run_cmd")
@mock.patch("gn3.computations.wgcna.compose_rscript_cmd")
@mock.patch("gn3.computations.wgcna.dump_wgcna_data")
def test_call_wgcna_script_fails(self, mock_dumping_data, mock_compose_wgcna, mock_run_cmd):
"""test for calling wgcna script\
fails and generates the expected error"""
# pylint: disable = line-too-long,
mock_dumping_data.return_value = "/tmp/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc-test.json"
mock_compose_wgcna.return_value = "Rscript/GUIX_PATH/scripts/r_file.R /tmp/QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc-test.json"
expected_error = {
"code": 2,
"output": "could not read the json file"
}
with mock.patch("builtins.open", mock.mock_open(read_data="")):
mock_run_cmd.return_value = expected_error
self.assertEqual(call_wgcna_script(
"input_file.R", ""), expected_error)
@pytest.mark.unit_test
def test_compose_rscript_cmd(self):
"""test for composing wgcna cmd"""
expected_cmd = '"Rscript /home/scripts/test_script.R tmp/wgcna.json"'
cmd = compose_rscript_cmd(script_path="/home/scripts",
file_name="test_script.R",
temp_file_path="tmp/wgcna.json")
self.assertEqual(cmd, expected_cmd)
@ pytest.mark.unit_test
@ mock.patch("gn3.computations.wgcna.TMPDIR", "/tmp")
@ mock.patch("gn3.computations.wgcna.uuid.uuid4")
def test_create_json_file(self, file_name_generator):
"""test for writing the data to a csv file"""
# # All the traits we have data for (should not contain duplicates)
# All the strains we have data for (contains duplicates)
trait_sample_data = {"1425642_at": {"129S1/SvImJ": 7.142,
"A/J": 7.31, "AKR/J": 7.49,
"B6D2F1": 6.899, "BALB/cByJ": 7.172,
"BALB/cJ": 7.396},
"1457784_at": {"129S1/SvImJ": 7.071, "A/J": 7.05,
"AKR/J": 7.313,
"B6D2F1": 6.999, "BALB/cByJ": 7.293,
"BALB/cJ": 7.117},
"1444351_at": {"129S1/SvImJ": 7.221, "A/J": 7.246,
"AKR/J": 7.754,
"B6D2F1": 6.866, "BALB/cByJ": 6.752,
"BALB/cJ": 7.269}
}
expected_input = {
"trait_sample_data": trait_sample_data,
"TOMtype": "unsigned",
"minModuleSize": 30
}
with mock.patch("builtins.open", mock.mock_open()) as file_handler:
file_name_generator.return_value = "facb73ff-7eef-4053-b6ea-e91d3a22a00c"
results = dump_wgcna_data(
expected_input)
file_handler.assert_called_once_with(
"/tmp/facb73ff-7eef-4053-b6ea-e91d3a22a00c.json", 'w', encoding='utf-8')
self.assertEqual(
results, "/tmp/facb73ff-7eef-4053-b6ea-e91d3a22a00c.json")
def test_if_scripts_file_exists(self):
"""check if certain script files exists"""
files_data = [
("wgcna_analysis.R", True),
("ctl_analysis.R", True),
("control_experiment.R", False)
]
for file_name, file_exists in files_data:
file_path = os.path.join(R_SCRIPTS, file_name)
self.assertIs(os.path.exists(file_path), file_exists,
f"the file {file_path} doesn't exist in path")
|