diff options
Diffstat (limited to 'gn2/tests/unit/wqflask/marker_regression')
9 files changed, 788 insertions, 0 deletions
diff --git a/gn2/tests/unit/wqflask/marker_regression/__init__.py b/gn2/tests/unit/wqflask/marker_regression/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/__init__.py diff --git a/gn2/tests/unit/wqflask/marker_regression/genotype/bimbam/file_geno.txt b/gn2/tests/unit/wqflask/marker_regression/genotype/bimbam/file_geno.txt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/genotype/bimbam/file_geno.txt diff --git a/gn2/tests/unit/wqflask/marker_regression/genotype/bimbam/file_snps.txt b/gn2/tests/unit/wqflask/marker_regression/genotype/bimbam/file_snps.txt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/genotype/bimbam/file_snps.txt diff --git a/gn2/tests/unit/wqflask/marker_regression/test_display_mapping_results.py b/gn2/tests/unit/wqflask/marker_regression/test_display_mapping_results.py new file mode 100644 index 00000000..580d79cf --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/test_display_mapping_results.py @@ -0,0 +1,159 @@ +import unittest + +import htmlgen as HT +from gn2.wqflask.marker_regression.display_mapping_results import ( + DisplayMappingResults, + HtmlGenWrapper +) + + +class TestDisplayMappingResults(unittest.TestCase): + """Basic Methods to test Mapping Results""" + + def test_pil_colors(self): + """Test that colors use PILLOW color format""" + self.assertEqual(DisplayMappingResults.CLICKABLE_WEBQTL_REGION_COLOR, + (245, 211, 211)) + + +class TestHtmlGenWrapper(unittest.TestCase): + """Test Wrapper around HTMLGen""" + + def test_create_image(self): + """Test HT.Image method""" + self.assertEqual( + str(HtmlGenWrapper.create_image_tag(src="test.png", + alt="random", + border="0", + width="10", + height="13", + usemap="#webqtlmap")), + ("""<img alt="random" border="0" height="13" """ + """src="test.png" usemap="#webqtlmap" """ + """width="10"/>""") + ) + + def test_create_form(self): + """Test HT.Form method""" + test_form = HtmlGenWrapper.create_form_tag( + cgi="/testing/", + enctype='multipart/form-data', + name="formName", + submit=HtmlGenWrapper.create_input_tag( + type_='hidden', name='Default_Name') + ) + test_image = HtmlGenWrapper.create_image_tag( + src="test.png", + alt="random", + border="0", + width="10", + height="13", + usemap="#webqtlmap" + ) + self.assertEqual( + str(test_form).replace("\n", ""), + ("""<form action="/testing/" enctype="multipart/form-data" """ + """method="POST" """ + """name="formName"><input name="Default_Name" """ + """type="hidden"/></form>""")) + hddn = { + 'FormID': 'showDatabase', + 'ProbeSetID': '_', + 'database': "TestGeno", + 'CellID': '_', + 'RISet': "Test", + 'incparentsf1': 'ON' + } + for key in hddn.keys(): + test_form.append( + HtmlGenWrapper.create_input_tag( + name=key, + value=hddn[key], + type_='hidden')) + test_form.append(test_image) + + self.assertEqual(str(test_form).replace("\n", ""), ( + """<form action="/testing/" enctype="multipart/form-data" """ + """method="POST" name="formName">""" + """<input name="Default_Name" type="hidden"/>""" + """<input name="FormID" type="hidden" value="showDatabase"/>""" + """<input name="ProbeSetID" type="hidden" value="_"/>""" + """<input name="database" type="hidden" value="TestGeno"/>""" + """<input name="CellID" type="hidden" value="_"/>""" + """<input name="RISet" type="hidden" value="Test"/>""" + """<input name="incparentsf1" type="hidden" value="ON"/>""" + """<img alt="random" border="0" height="13" src="test.png" """ + """usemap="#webqtlmap" width="10"/>""" + """</form>""")) + + def test_create_paragraph(self): + """Test HT.Paragraph method""" + test_p_element = HtmlGenWrapper.create_p_tag(id="smallSize") + par_text = ( + "Mapping using genotype data as " + "a trait will result in infinity LRS at one locus. " + "In order to display the result properly, all LRSs " + "higher than 100 are capped at 100." + ) + self.assertEqual( + str(test_p_element), + """<p id="smallSize"></p>""" + ) + test_p_element.append(HtmlGenWrapper.create_br_tag()) + test_p_element.append(par_text) + self.assertEqual( + str(test_p_element), + """<p id="smallSize"><br/>{}</p>""".format(par_text) + ) + + def test_create_br_tag(self): + """Test HT.BR() method""" + self.assertEqual(str(HtmlGenWrapper.create_br_tag()), + "<br/>") + + def test_create_input_tag(self): + """Test HT.Input method""" + self.assertEqual( + str(HtmlGenWrapper.create_input_tag( + type_="hidden", + name="name", + value="key", + Class="trait trait_")).replace("\n", ""), + ("""<input class="trait trait_" name="name" """ + """type="hidden" value="key"/>""")) + + def test_create_map_tag(self): + """Test HT.Map method""" + self.assertEqual(str(HtmlGenWrapper.create_map_tag( + name="WebqTLImageMap")).replace("\n", ""), + """<map name="WebqTLImageMap"></map>""") + gifmap = HtmlGenWrapper.create_map_tag(name="test") + gifmap.append(HtmlGenWrapper.create_area_tag(shape="rect", + coords='1 2 3', href='#area1')) + gifmap.append(HtmlGenWrapper.create_area_tag(shape="rect", + coords='1 2 3', href='#area2')) + self.assertEqual( + str(gifmap).replace("\n", ""), + ("""<map name="test">""" + """<area coords="1 2 3" """ + """href="#area1" shape="rect"/>""" + """<area coords="1 2 3" href="#area2" shape="rect"/>""" + """</map>""")) + + def test_create_area_tag(self): + """Test HT.Area method""" + self.assertEqual( + str(HtmlGenWrapper.create_area_tag( + shape="rect", + coords="1 2", + href="http://test.com", + title="Some Title")).replace("\n", ""), + ("""<area coords="1 2" href="http://test.com" """ + """shape="rect" title="Some Title"/>""")) + + def test_create_link_tag(self): + """Test HT.HREF method""" + self.assertEqual( + str(HtmlGenWrapper.create_link_tag( + "www.test.com", "test", target="_blank")).replace("\n", ""), + """<a href="www.test.com" target="_blank">test</a>""") diff --git a/gn2/tests/unit/wqflask/marker_regression/test_gemma_mapping.py b/gn2/tests/unit/wqflask/marker_regression/test_gemma_mapping.py new file mode 100644 index 00000000..26f0c50a --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/test_gemma_mapping.py @@ -0,0 +1,188 @@ +# test for wqflask/marker_regression/gemma_mapping.py +import os +import unittest +import random +from unittest import mock +from gn2.wqflask.marker_regression.gemma_mapping import run_gemma +from gn2.wqflask.marker_regression.gemma_mapping import gen_pheno_txt_file +from gn2.wqflask.marker_regression.gemma_mapping import gen_covariates_file +from gn2.wqflask.marker_regression.gemma_mapping import parse_loco_output + + +class AttributeSetter: + def __init__(self, obj): + for key, val in obj.items(): + setattr(self, key, val) + + +class MockGroup(AttributeSetter): + def get_samplelist(self, redis_conn): + return None + + +class TestGemmaMapping(unittest.TestCase): + + @mock.patch("wqflask.marker_regression.gemma_mapping.parse_loco_output") + def test_run_gemma_firstrun_set_false(self, mock_parse_loco): + """add tests for gemma function where first run is set to false""" + dataset = AttributeSetter( + {"group": AttributeSetter({"genofile": "genofile.geno"})}) + + output_file = "file1" + mock_parse_loco.return_value = [] + this_trait = AttributeSetter({"name": "t1"}) + + result = run_gemma(this_trait=this_trait, this_dataset=dataset, samples=[], vals=[ + ], covariates="", use_loco=True, first_run=False, output_files=output_file) + + expected_results = ([], "file1") + self.assertEqual(expected_results, result) + + @mock.patch("wqflask.marker_regression.gemma_mapping.webqtlConfig.GENERATED_IMAGE_DIR", "/home/user/img") + @mock.patch("wqflask.marker_regression.gemma_mapping.GEMMAOPTS", "-debug") + @mock.patch("wqflask.marker_regression.gemma_mapping.GEMMA_WRAPPER_COMMAND", "ghc") + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", + os.path.join(os.path.dirname(__file__), "user/data")) + @mock.patch("wqflask.marker_regression.gemma_mapping.parse_loco_output") + @mock.patch("wqflask.marker_regression.gemma_mapping.flat_files") + @mock.patch("wqflask.marker_regression.gemma_mapping.gen_covariates_file") + @mock.patch("wqflask.marker_regression.run_mapping.random.choice") + @mock.patch("wqflask.marker_regression.gemma_mapping.os") + @mock.patch("wqflask.marker_regression.gemma_mapping.gen_pheno_txt_file") + def test_run_gemma_firstrun_set_true(self, mock_gen_pheno_txt, mock_os, mock_choice, mock_gen_covar, mock_flat_files, mock_parse_loco): + """add tests for run_gemma where first run is set to true""" + this_chromosomes = {} + for i in range(1, 5): + this_chromosomes[f'CH{i}'] = (AttributeSetter({"name": f"CH{i}"})) + chromosomes = AttributeSetter({"chromosomes": lambda cursor: this_chromosomes}) + + dataset_group = MockGroup( + {"name": "GP1", "genofile": "file_geno"}) + dataset = AttributeSetter({"group": dataset_group, "name": "dataset1_name", + "species": AttributeSetter({"chromosomes": chromosomes})}) + trait = AttributeSetter({"name": "trait1"}) + samples = [] + mock_gen_pheno_txt.return_value = None + mock_os.path.isfile.return_value = True + mock_gen_covar.return_value = None + mock_choice.return_value = "R" + mock_flat_files.return_value = os.path.join( + os.path.dirname(__file__), "genotype/bimbam") + mock_parse_loco.return_value = [] + results = run_gemma(this_trait=trait, this_dataset=dataset, samples=[ + ], vals=[], covariates="", use_loco=True) + mock_gen_pheno_txt.assert_called_once() + mock_parse_loco.assert_called_once_with( + dataset, "GP1_GWA_RRRRRR", True) + mock_os.path.isfile.assert_called_once_with( + ('/home/user/imgfile_output.assoc.txt')) + self.assertEqual(results, ([], "GP1_GWA_RRRRRR")) + + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/user/data") + def test_gen_pheno_txt_file(self): + """add tests for generating pheno txt file""" + with mock.patch("builtins.open", mock.mock_open())as mock_open: + gen_pheno_txt_file( + this_dataset=AttributeSetter({"name": "A"}), + genofile_name="", vals=[ + "x", "w", "q", "we", "R"]) + mock_open.assert_called_once_with( + '/home/user/data/gn2/PHENO_KiAEKlCvM6iGTM9Kh_TAlQ.txt', 'w') + filehandler = mock_open() + values = ["x", "w", "q", "we", "R"] + write_calls = [mock.call('NA\n'), mock.call('w\n'), mock.call( + 'q\n'), mock.call('we\n'), mock.call('R\n')] + + filehandler.write.assert_has_calls(write_calls) + + @mock.patch("wqflask.marker_regression.gemma_mapping.flat_files") + @mock.patch("wqflask.marker_regression.gemma_mapping.create_trait") + @mock.patch("wqflask.marker_regression.gemma_mapping.create_dataset") + def test_gen_covariates_file(self, create_dataset, create_trait, flat_files): + """add tests for generating covariates files""" + covariates = "X1:X2,Y1:Y2,M1:M3,V1:V2" + samplelist = ["X1", "X2", "X3", "X4"] + create_dataset_side_effect = [] + create_trait_side_effect = [] + + for i in range(4): + create_dataset_side_effect.append( + AttributeSetter({"name": f'name_{i}'})) + create_trait_side_effect.append( + AttributeSetter({"data": [f'data_{i}']})) + + create_dataset.side_effect = create_trait_side_effect + create_trait.side_effect = create_trait_side_effect + + group = MockGroup({"name": "group_X", "samplelist": samplelist}) + this_dataset = AttributeSetter({"group": group, "name": "dataset1_name"}) + flat_files.return_value = "Home/Genenetwork" + + with mock.patch("builtins.open", mock.mock_open())as mock_open: + gen_covariates_file(this_dataset=this_dataset, covariates=covariates, + samples=["x1", "x2", "X3"]) + + create_dataset.assert_has_calls( + [mock.call('X2'), mock.call('Y2'), mock.call('M3'), mock.call('V2')]) + mock_calls = [] + trait_names = ["X1", "Y1", "M1", "V1"] + + for i, trait in enumerate(create_trait_side_effect): + mock_calls.append( + mock.call(dataset=trait, name=trait_names[i], cellid=None)) + + create_trait.assert_has_calls(mock_calls) + + flat_files.assert_called_once_with('mapping') + mock_open.assert_called_once_with( + 'Home/Genenetwork/COVAR_npKxIOnq3azWdgYixtd9IQ.txt', 'w') + filehandler = mock_open() + filehandler.write.assert_has_calls([mock.call( + '-9\t'), mock.call('-9\t'), mock.call('-9\t'), mock.call('-9\t'), mock.call('\n')]) + + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/tmp") + @mock.patch("wqflask.marker_regression.gemma_mapping.os") + @mock.patch("wqflask.marker_regression.gemma_mapping.json") + def test_parse_loco_outputfile_found(self, mock_json, mock_os): + """add tests for parse loco output file found""" + mock_json.load.return_value = { + "files": [["file_name", "user", "~/file1"], + ["file_name", "user", "~/file2"]] + } + return_file = """X/Y\tM1\t28.457155\tQ\tE\tA\tMMB\t23.3\tW\t0.9\t0.85\t +chr4\tM2\t12\tQ\tE\tMMB\tR\t24\tW\t0.87\t0.5 +Y\tM4\t12\tQ\tE\tMMB\tR\t11.6\tW\t0.21\t0.7 +X\tM5\t12\tQ\tE\tMMB\tR\t21.1\tW\t0.65\t0.6""" + + return_file_2 = """chr\tother\t21322\tQ\tE\tA\tP\tMMB\tCDE\t0.5\t0.4""" + mock_os.path.isfile.return_value = True + file_to_write = """{"files":["file_1","file_2"]}""" + with mock.patch("builtins.open") as mock_open: + + handles = (mock.mock_open(read_data="gwas").return_value, mock.mock_open( + read_data=return_file).return_value, mock.mock_open(read_data=return_file_2).return_value) + mock_open.side_effect = handles + results = parse_loco_output( + this_dataset={}, gwa_output_filename=".xw/") + expected_results = [ + {'name': 'M1', 'chr': 'X/Y', 'Mb': 2.8457155e-05, 'p_value': 0.85, + 'additive': -11.65, 'lod_score': 0.07058107428570727}, + {'name': 'M2', 'chr': 4, 'Mb': 1.2e-05, 'p_value': 0.5, + 'additive': -12.0, 'lod_score': 0.3010299956639812}, + {'name': 'M4', 'chr': 'Y', 'Mb': 1.2e-05, 'p_value': 0.7, + 'additive': -5.8, 'lod_score': 0.1549019599857432}, + {'name': 'M5', 'chr': 'X', 'Mb': 1.2e-05, 'p_value': 0.6, 'additive': -10.55, 'lod_score': 0.22184874961635637}] + self.assertEqual(expected_results, results) + + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/tmp") + @mock.patch("wqflask.marker_regression.gemma_mapping.os") + def test_parse_loco_outputfile_not_found(self, mock_os): + """add tests for parse loco output where output file not found""" + + mock_os.path.isfile.return_value = False + file_to_write = """{"files":["file_1","file_2"]}""" + + with mock.patch("builtins.open", mock.mock_open(read_data=file_to_write)) as mock_open: + results = parse_loco_output( + this_dataset={}, gwa_output_filename=".xw/") + self.assertEqual(results, []) diff --git a/gn2/tests/unit/wqflask/marker_regression/test_plink_mapping.py b/gn2/tests/unit/wqflask/marker_regression/test_plink_mapping.py new file mode 100644 index 00000000..4c13b907 --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/test_plink_mapping.py @@ -0,0 +1,86 @@ +# test for wqflask/marker_regression/plink_mapping.py +import unittest +from unittest import mock +from gn2.wqflask.marker_regression.plink_mapping import build_line_list +from gn2.wqflask.marker_regression.plink_mapping import get_samples_from_ped_file +from gn2.wqflask.marker_regression.plink_mapping import flat_files +from gn2.wqflask.marker_regression.plink_mapping import gen_pheno_txt_file_plink +from gn2.wqflask.marker_regression.plink_mapping import parse_plink_output + + +class AttributeSetter: + def __init__(self, obj): + for key, val in obj.items(): + setattr(self, key, val) + + +class TestPlinkMapping(unittest.TestCase): + + def test_build_line_list(self): + """test for building line list""" + line_1 = "this is line one test" + irregular_line = " this is an, irregular line " + exp_line1 = ["this", "is", "line", "one", "test"] + + results = build_line_list(irregular_line) + self.assertEqual(exp_line1, build_line_list(line_1)) + self.assertEqual([], build_line_list()) + self.assertEqual(["this", "is", "an,", "irregular", "line"], results) + + @mock.patch("wqflask.marker_regression.plink_mapping.flat_files") + def test_get_samples_from_ped_file(self, mock_flat_files): + """test for getting samples from ped file""" + dataset = AttributeSetter({"group": AttributeSetter({"name": "n_1"})}) + file_sample = """Expected_1\tline test +Expected_2\there + Expected_3\tthree""" + mock_flat_files.return_value = "/home/user/" + with mock.patch("builtins.open", mock.mock_open(read_data=file_sample)) as mock_open: + results = get_samples_from_ped_file(dataset) + mock_flat_files.assert_called_once_with("mapping") + mock_open.assert_called_once_with("/home/user/n_1.ped", "r") + self.assertEqual( + ["Expected_1", "Expected_2", "Expected_3"], results) + + @mock.patch("wqflask.marker_regression.plink_mapping.TMPDIR", "/home/user/data/") + @mock.patch("wqflask.marker_regression.plink_mapping.get_samples_from_ped_file") + def test_gen_pheno_txt_file_plink(self, mock_samples): + """test for getting gen_pheno txt file""" + mock_samples.return_value = ["Expected_1", "Expected_2", "Expected_3"] + + trait = AttributeSetter({"name": "TX"}) + dataset = AttributeSetter({"group": AttributeSetter({"name": "n_1"})}) + vals = ["value=K1", "value=K2", "value=K3"] + with mock.patch("builtins.open", mock.mock_open()) as mock_open: + results = gen_pheno_txt_file_plink(this_trait=trait, dataset=dataset, + vals=vals, pheno_filename="ph_file") + mock_open.assert_called_once_with( + "/home/user/data/ph_file.txt", "wb") + filehandler = mock_open() + calls_expected = [mock.call('FID\tIID\tTX\n'), + mock.call('Expected_1\tExpected_1\tK1\nExpected_2\tExpected_2\tK2\nExpected_3\tExpected_3\tK3\n')] + + filehandler.write.assert_has_calls(calls_expected) + + filehandler.close.assert_called_once() + + @mock.patch("wqflask.marker_regression.plink_mapping.TMPDIR", "/home/user/data/") + @mock.patch("wqflask.marker_regression.plink_mapping.build_line_list") + def test_parse_plink_output(self, mock_line_list): + """test for parsing plink output""" + chromosomes = [0, 34, 110, 89, 123, 23, 2] + species = AttributeSetter( + {"name": "S1", "chromosomes": AttributeSetter({"chromosomes": chromosomes})}) + + fake_file = """0 AACCAT T98.6 0.89\n2 AATA B45 0.3\n121 ACG B56.4 NA""" + + mock_line_list.side_effect = [["0", "AACCAT", "T98.6", "0.89"], [ + "2", "AATA", "B45", "0.3"], ["121", "ACG", "B56.4", "NA"]] + with mock.patch("builtins.open", mock.mock_open(read_data=fake_file)) as mock_open: + parse_results = parse_plink_output( + output_filename="P1_file", species=species) + mock_open.assert_called_once_with( + "/home/user/data/P1_file.qassoc", "rb") + expected = (2, {'AACCAT': 0.89, 'AATA': 0.3}) + + self.assertEqual(parse_results, expected) diff --git a/gn2/tests/unit/wqflask/marker_regression/test_qtlreaper_mapping.py b/gn2/tests/unit/wqflask/marker_regression/test_qtlreaper_mapping.py new file mode 100644 index 00000000..2268f4cb --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/test_qtlreaper_mapping.py @@ -0,0 +1,24 @@ +import unittest +from unittest import mock +from gn2.wqflask.marker_regression.qtlreaper_mapping import gen_pheno_txt_file + +# issues some methods in genofile object are not defined +# modify samples should equal to vals + + +class TestQtlReaperMapping(unittest.TestCase): + @mock.patch("wqflask.marker_regression.qtlreaper_mapping.TEMPDIR", "/home/user/data") + def test_gen_pheno_txt_file(self): + vals = ["V1", "x", "V4", "V3", "x"] + samples = ["S1", "S2", "S3", "S4", "S5"] + trait_filename = "trait_file" + with mock.patch("builtins.open", mock.mock_open())as mock_open: + gen_pheno_txt_file(samples=samples, vals=vals, + trait_filename=trait_filename) + mock_open.assert_called_once_with( + "/home/user/data/gn2/trait_file.txt", "w") + filehandler = mock_open() + write_calls = [mock.call('Trait\t'), mock.call( + 'S1\tS3\tS4\n'), mock.call('T1\t'), mock.call('V1\tV4\tV3')] + + filehandler.write.assert_has_calls(write_calls) diff --git a/gn2/tests/unit/wqflask/marker_regression/test_rqtl_mapping.py b/gn2/tests/unit/wqflask/marker_regression/test_rqtl_mapping.py new file mode 100644 index 00000000..5cfce28c --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/test_rqtl_mapping.py @@ -0,0 +1,43 @@ +import unittest +from unittest import mock +from dataclasses import dataclass + +from gn2.wqflask.marker_regression.rqtl_mapping import run_rqtl + +@dataclass +class MockGroup: + name: str + genofile: str + +@dataclass +class MockDataset: + group: MockGroup + +class TestRqtlMapping(unittest.TestCase): + """Tests for functions in rqtl_mapping.py""" + @mock.patch("wqflask.marker_regression.rqtl_mapping.requests.post") + @mock.patch("wqflask.marker_regression.rqtl_mapping.locate") + @mock.patch("wqflask.marker_regression.rqtl_mapping.write_phenotype_file") + def test_run_rqtl_with_perm(self, mock_write_pheno_file, mock_locate, mock_post): + """Test for run_rqtl with permutations > 0""" + dataset_group = MockGroup("GP1", "file_geno") + dataset = MockDataset(dataset_group) + + mock_write_pheno_file.return_value = "pheno_filename" + mock_locate.return_value = "geno_filename" + mock_post.return_value = mock.Mock(ok=True) + mock_post.return_value.json.return_value = {"perm_results": [], + "suggestive": 3, + "significant": 4, + "results" : []} + + results = run_rqtl(trait_name="the_trait", vals=[], samples=[], + dataset=dataset, pair_scan=False, mapping_scale="cM", model="normal", method="hk", + num_perm=5, perm_strata_list=[], do_control="false", control_marker="", + manhattan_plot=True, cofactors="") + + mock_write_pheno_file.assert_called_once() + mock_locate.assert_called_once() + mock_post.assert_called_once() + + self.assertEqual(results, ([], 3, 4, [])) diff --git a/gn2/tests/unit/wqflask/marker_regression/test_run_mapping.py b/gn2/tests/unit/wqflask/marker_regression/test_run_mapping.py new file mode 100644 index 00000000..dd67ce49 --- /dev/null +++ b/gn2/tests/unit/wqflask/marker_regression/test_run_mapping.py @@ -0,0 +1,288 @@ +import unittest +import datetime +from unittest import mock + +from gn2.wqflask.marker_regression.run_mapping import get_genofile_samplelist +from gn2.wqflask.marker_regression.run_mapping import geno_db_exists +from gn2.wqflask.marker_regression.run_mapping import write_input_for_browser +from gn2.wqflask.marker_regression.run_mapping import export_mapping_results +from gn2.wqflask.marker_regression.run_mapping import trim_markers_for_figure +from gn2.wqflask.marker_regression.run_mapping import get_perm_strata +from gn2.wqflask.marker_regression.run_mapping import get_chr_lengths + + +class AttributeSetter: + def __init__(self, obj): + for k, v in obj.items(): + setattr(self, k, v) + + +class MockGroup(AttributeSetter): + + def get_genofiles(self): + return [{"location": "~/genofiles/g1_file", "sample_list": ["S1", "S2", "S3", "S4"]}] + + +class TestRunMapping(unittest.TestCase): + def setUp(self): + + self.group = MockGroup( + {"genofile": "~/genofiles/g1_file", "name": "GP1_", "species": "Human"}) + chromosomes = { + "3": AttributeSetter({ + "name": "C1", + "length": "0.04" + }), + "4": AttributeSetter({ + "name": "C2", + "length": "0.03" + }), + "5": AttributeSetter({ + "name": "C4", + "length": "0.01" + }) + } + self.dataset = AttributeSetter( + {"fullname": "dataset_1", "group": self.group, "type": "ProbeSet"}) + + self.chromosomes = AttributeSetter({"chromosomes": lambda cur: chromosomes}) + self.trait = AttributeSetter( + {"symbol": "IGFI", "chr": "X1", "mb": 123313, "display_name": "Test Name"}) + + def tearDown(self): + self.dataset = AttributeSetter( + {"group": {"location": "~/genofiles/g1_file"}}) + + def test_get_genofile_samplelist(self): + + results_1 = get_genofile_samplelist(self.dataset) + self.assertEqual(results_1, ["S1", "S2", "S3", "S4"]) + self.group.genofile = "~/genofiles/g2_file" + result_2 = get_genofile_samplelist(self.dataset) + self.assertEqual(result_2, []) + + @mock.patch("wqflask.marker_regression.run_mapping.data_set") + def test_if_geno_db_exists(self, mock_data_set): + mock_data_set.create_dataset.side_effect = [ + AttributeSetter({}), Exception()] + results_no_error = geno_db_exists(self.dataset) + results_with_error = geno_db_exists(self.dataset) + + self.assertEqual(mock_data_set.create_dataset.call_count, 2) + self.assertEqual(results_with_error, "False") + self.assertEqual(results_no_error, "True") + + def test_trim_markers_for_figure(self): + + markers = [{ + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "lod_score":0.5 + }, + { + "name": "MK2", + "chr": "C2", + "cM": "15", + "Mb": "10000", + "genotypes": [], + "lod_score":0.7 + }, + { + "name": "MK1", + "chr": "C3", + "cM": "45", + "Mb": "1", + "genotypes": [], + "dominance":"Tt", + "additive":"VE", + "lod_score":1 + }] + + marker_2 = [{ + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "p_wald":4.6 + }] + results = trim_markers_for_figure(markers) + result_2 = trim_markers_for_figure(marker_2) + expected = [ + { + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "lod_score":0.5 + }, + { + "name": "MK1", + "chr": "C3", + "cM": "45", + "Mb": "1", + "genotypes": [], + "dominance":"Tt", + "additive":"VE", + "lod_score":1 + } + + ] + self.assertEqual(results, expected) + self.assertEqual(result_2, marker_2) + + def test_export_mapping_results(self): + """test for exporting mapping results""" + datetime_mock = mock.Mock(wraps=datetime.datetime) + datetime_mock.now.return_value = datetime.datetime( + 2019, 9, 1, 10, 12, 12) + + markers = [{ + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "lod_score":3 + }, + { + "name": "MK2", + "chr": "C2", + "cM": "15", + "Mb": "10000", + "genotypes": [], + "lod_score":7 + }, + { + "name": "MK1", + "chr": "C3", + "cM": "45", + "Mb": "1", + "genotypes": [], + "dominance":"Tt", + "additive":"VE", + "lod_score":7 + }] + + with mock.patch("builtins.open", mock.mock_open()) as mock_open: + + with mock.patch("wqflask.marker_regression.run_mapping.datetime.datetime", new=datetime_mock): + export_mapping_results(dataset=self.dataset, trait=self.trait, markers=markers, + results_path="~/results", mapping_method="gemma", mapping_scale="physic", + score_type="-logP", transform="qnorm", + covariates="Dataset1:Trait1,Dataset2:Trait2", + n_samples="100", vals_hash="") + + write_calls = [ + mock.call('Time/Date: 09/01/19 / 10:12:12\n'), + mock.call('Population: Human GP1_\n'), mock.call( + 'Data Set: dataset_1\n'), + mock.call('Trait: Test Name\n'), + mock.call('Trait Hash: \n'), + mock.call('N Samples: 100\n'), + mock.call('Mapping Tool: gemma\n'), + mock.call('Transform - Quantile Normalized\n'), + mock.call('Gene Symbol: IGFI\n'), mock.call( + 'Location: X1 @ 123313 Mb\n'), + mock.call('Cofactors (dataset - trait):\n'), + mock.call('Trait1 - Dataset1\n'), + mock.call('Trait2 - Dataset2\n'), + mock.call('\n'), mock.call('Name,Chr,'), + mock.call('Mb,-logP'), + mock.call(',Additive'), mock.call(',Dominance'), + mock.call('\n'), mock.call('MK1,C1,'), + mock.call('12000,'), mock.call('3'), + mock.call(',VA'), mock.call(',TT'), + mock.call('\n'), mock.call('MK2,C2,'), + mock.call('10000,'), mock.call('7'), + mock.call('\n'), mock.call('MK1,C3,'), + mock.call('1,'), mock.call('7'), + mock.call(',VE'), mock.call(',Tt') + ] + mock_open.assert_called_once_with("~/results", "w+") + filehandler = mock_open() + filehandler.write.assert_has_calls(write_calls) + + @mock.patch("wqflask.marker_regression.run_mapping.random.choice") + def test_write_input_for_browser(self, mock_choice): + """test for writing input for browser""" + mock_choice.side_effect = ["F", "i", "l", "e", "s", "x"] + with mock.patch("builtins.open", mock.mock_open()) as mock_open: + expected = ['GP1__Filesx_GWAS', 'GP1__Filesx_ANNOT'] + + results = write_input_for_browser( + this_dataset=self.dataset, gwas_results={}, annotations={}) + self.assertEqual(results, expected) + + def test_get_perm_strata(self): + categorical_vars = ["C1", "C2", "W1"] + used_samples = ["S1", "S2"] + sample_list = AttributeSetter({"sample_attribute_values": { + "S1": { + "c1": "c1_value", + "c2": "c2_value", + "w1": "w1_value" + }, + "S2": { + "w1": "w2_value", + "w2": "w2_value" + }, + "S3": { + + "c1": "c1_value", + "c2": "c2_value" + }, + }}) + results = get_perm_strata(this_trait={}, sample_list=sample_list, + categorical_vars=categorical_vars, used_samples=used_samples) + self.assertEqual(results, [1, 1]) + + def test_get_chr_length(self): + """test for getting chromosome length""" + cursor = mock.MagicMock() + chromosomes = AttributeSetter({"chromosomes": self.chromosomes}) + dataset = AttributeSetter({"species": chromosomes}) + results = get_chr_lengths( + mapping_scale="physic", mapping_method="reaper", dataset=dataset, qtl_results=[]) + chr_lengths = [] + for key, chromo in self.chromosomes.chromosomes(cursor).items(): + chr_lengths.append({"chr": chromo.name, "size": chromo.length}) + + self.assertEqual(chr_lengths, results) + + qtl_results = [{ + "chr": "16", + "cM": "0.2" + }, + { + "chr": "12", + "cM": "0.5" + }, + { + "chr": "18", + "cM": "0.1" + }, + { + "chr": "22", + "cM": "0.4" + }, + ] + + result_with_other_mapping_scale = get_chr_lengths( + mapping_scale="other", mapping_method="reaper", dataset=dataset, qtl_results=qtl_results) + expected_value = [{'chr': '1', 'size': '0'}, { + 'chr': '16', 'size': '500000.0'}, {'chr': '18', 'size': '400000.0'}] + + self.assertEqual(result_with_other_mapping_scale, expected_value) |