diff options
8 files changed, 683 insertions, 36 deletions
diff --git a/wqflask/tests/wqflask/marker_regression/test_gemma_mapping.py b/wqflask/tests/wqflask/marker_regression/test_gemma_mapping.py new file mode 100644 index 00000000..4fafd95a --- /dev/null +++ b/wqflask/tests/wqflask/marker_regression/test_gemma_mapping.py @@ -0,0 +1,205 @@ +# test for wqflask/marker_regression/gemma_mapping.py +import unittest +import random +from unittest import mock +from wqflask.marker_regression.gemma_mapping import run_gemma +from wqflask.marker_regression.gemma_mapping import gen_pheno_txt_file +from wqflask.marker_regression.gemma_mapping import gen_covariates_file +from wqflask.marker_regression.gemma_mapping import parse_gemma_output +from wqflask.marker_regression.gemma_mapping import parse_loco_output + + +class AttributeSetter: + def __init__(self, obj): + for key, val in obj.items(): + setattr(self, key, val) + + +class MockDatasetGroup(AttributeSetter): + def get_samplelist(self): + return None + + +class TestGemmaMapping(unittest.TestCase): + + @mock.patch("wqflask.marker_regression.gemma_mapping.parse_loco_output") + def test_run_gemma_firstrun_set_false(self, mock_parse_loco): + """add tests for gemma function where first run is set to false""" + dataset = AttributeSetter( + {"group": AttributeSetter({"genofile": "genofile.geno"})}) + + output_file = "file1" + mock_parse_loco.return_value = [] + this_trait = AttributeSetter({"name": "t1"}) + + result = run_gemma(this_trait=this_trait, this_dataset=dataset, samples=[], vals=[ + ], covariates="", use_loco=True, first_run=False, output_files=output_file) + + expected_results = ([], "file1") + self.assertEqual(expected_results, result) + + @mock.patch("wqflask.marker_regression.gemma_mapping.webqtlConfig.GENERATED_IMAGE_DIR", "/home/user/img") + @mock.patch("wqflask.marker_regression.gemma_mapping.GEMMAOPTS", "-debug") + @mock.patch("wqflask.marker_regression.gemma_mapping.GEMMA_WRAPPER_COMMAND", "ghc") + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/user/data/") + @mock.patch("wqflask.marker_regression.gemma_mapping.parse_loco_output") + @mock.patch("wqflask.marker_regression.gemma_mapping.logger") + @mock.patch("wqflask.marker_regression.gemma_mapping.flat_files") + @mock.patch("wqflask.marker_regression.gemma_mapping.gen_covariates_file") + @mock.patch("wqflask.marker_regression.run_mapping.random.choice") + @mock.patch("wqflask.marker_regression.gemma_mapping.os") + @mock.patch("wqflask.marker_regression.gemma_mapping.gen_pheno_txt_file") + def test_run_gemma_firstrun_set_true(self, mock_gen_pheno_txt, mock_os, mock_choice, mock_gen_covar, mock_flat_files, mock_logger, mock_parse_loco): + """add tests for run_gemma where first run is set to true""" + chromosomes = [] + for i in range(1, 5): + chromosomes.append(AttributeSetter({"name": f"CH{i}"})) + chromo = AttributeSetter({"chromosomes": chromosomes}) + dataset_group = MockDatasetGroup( + {"name": "GP1", "genofile": "file_geno"}) + dataset = AttributeSetter({"group": dataset_group, "name": "dataset1_name", + "species": AttributeSetter({"chromosomes": chromo})}) + trait = AttributeSetter({"name": "trait1"}) + samples = [] + mock_gen_pheno_txt.return_value = None + mock_os.path.isfile.return_value = True + mock_gen_covar.return_value = None + mock_choice.return_value = "R" + mock_flat_files.return_value = "/home/genotype/bimbam" + mock_parse_loco.return_value = [] + results = run_gemma(this_trait=trait, this_dataset=dataset, samples=[ + ], vals=[], covariates="", use_loco=True) + system_calls = [mock.call('ghc --json -- -debug -g /home/genotype/bimbam/file_geno.txt -p /home/user/data//gn2/trait1_dataset1_name_pheno.txt -a /home/genotype/bimbam/file_snps.txt -gk > /home/user/data//gn2/GP1_K_RRRRRR.json'), + mock.call('ghc --json --input /home/user/data//gn2/GP1_K_RRRRRR.json -- -debug -a /home/genotype/bimbam/file_snps.txt -lmm 2 -g /home/genotype/bimbam/file_geno.txt -p /home/user/data//gn2/trait1_dataset1_name_pheno.txt > /home/user/data//gn2/GP1_GWA_RRRRRR.json')] + mock_os.system.assert_has_calls(system_calls) + mock_gen_pheno_txt.assert_called_once() + mock_parse_loco.assert_called_once_with(dataset, "GP1_GWA_RRRRRR") + mock_os.path.isfile.assert_called_once_with( + ('/home/user/imgfile_output.assoc.txt')) + self.assertEqual(mock_logger.debug.call_count, 2) + self.assertEqual(mock_flat_files.call_count, 4) + self.assertEqual(results, ([], "GP1_GWA_RRRRRR")) + + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/user/data") + def test_gen_pheno_txt_file(self): + """add tests for generating pheno txt file""" + with mock.patch("builtins.open", mock.mock_open())as mock_open: + gen_pheno_txt_file(this_dataset={}, genofile_name="", vals=[ + "x", "w", "q", "we", "R"], trait_filename="fitr.re") + mock_open.assert_called_once_with( + '/home/user/data/gn2/fitr.re.txt', 'w') + filehandler = mock_open() + values = ["x", "w", "q", "we", "R"] + write_calls = [mock.call('NA\n'), mock.call('w\n'), mock.call( + 'q\n'), mock.call('we\n'), mock.call('R\n')] + + filehandler.write.assert_has_calls(write_calls) + + @mock.patch("wqflask.marker_regression.gemma_mapping.flat_files") + @mock.patch("wqflask.marker_regression.gemma_mapping.create_trait") + @mock.patch("wqflask.marker_regression.gemma_mapping.create_dataset") + def test_gen_covariates_file(self, create_dataset, create_trait, flat_files): + """add tests for generating covariates files""" + covariates = "X1:X2,Y1:Y2,M1:M3,V1:V2" + samplelist = ["X1", "X2", "X3", "X4"] + create_dataset_side_effect = [] + create_trait_side_effect = [] + + for i in range(4): + create_dataset_side_effect.append(AttributeSetter({"name": f'name_{i}'})) + create_trait_side_effect.append( + AttributeSetter({"data": [f'data_{i}']})) + + create_dataset.side_effect = create_trait_side_effect + create_trait.side_effect = create_trait_side_effect + + group = MockDatasetGroup({"name": "group_X", "samplelist": samplelist}) + this_dataset = AttributeSetter({"group": group}) + flat_files.return_value = "Home/Genenetwork" + + with mock.patch("builtins.open", mock.mock_open())as mock_open: + gen_covariates_file(this_dataset=this_dataset, covariates=covariates, + samples=["x1", "x2", "X3"]) + + create_dataset.assert_has_calls( + [mock.call('X2'), mock.call('Y2'), mock.call('M3'), mock.call('V2')]) + mock_calls = [] + trait_names = ["X1", "Y1", "M1", "V1"] + + for i, trait in enumerate(create_trait_side_effect): + mock_calls.append( + mock.call(dataset=trait, name=trait_names[i], cellid=None)) + + create_trait.assert_has_calls(mock_calls) + + flat_files.assert_called_once_with('mapping') + mock_open.assert_called_once_with( + 'Home/Genenetwork/group_X_covariates.txt', 'w') + filehandler = mock_open() + filehandler.write.assert_has_calls([mock.call( + '-9\t'), mock.call('-9\t'), mock.call('-9\t'), mock.call('-9\t'), mock.call('\n')]) + + @mock.patch("wqflask.marker_regression.gemma_mapping.webqtlConfig.GENERATED_IMAGE_DIR", "/home/user/img/") + def test_parse_gemma_output(self): + """add test for generating gemma output with obj returned""" + file = """X/Y\t gn2\t21\tQ\tE\tA\tP\tMMB\tCDE\t0.5 +X/Y\tgn2\t21322\tQ\tE\tA\tP\tMMB\tCDE\t0.5 +chr\tgn1\t12312\tQ\tE\tA\tP\tMMB\tCDE\t0.7 +X\tgn7\t2324424\tQ\tE\tA\tP\tMMB\tCDE\t0.4 +125\tgn9\t433575\tQ\tE\tA\tP\tMMB\tCDE\t0.67 +""" + with mock.patch("builtins.open", mock.mock_open(read_data=file)) as mock_open: + results = parse_gemma_output(genofile_name="gema_file") + expected = [{'name': ' gn2', 'chr': 'X/Y', 'Mb': 2.1e-05, 'p_value': 0.5, 'lod_score': 0.3010299956639812}, {'name': 'gn2', 'chr': 'X/Y', 'Mb': 0.021322, 'p_value': 0.5, 'lod_score': 0.3010299956639812}, + {'name': 'gn7', 'chr': 'X', 'Mb': 2.324424, 'p_value': 0.4, 'lod_score': 0.3979400086720376}, {'name': 'gn9', 'chr': 125, 'Mb': 0.433575, 'p_value': 0.67, 'lod_score': 0.17392519729917352}] + mock_open.assert_called_once_with( + "/home/user/img/gema_file_output.assoc.txt") + self.assertEqual(results, expected) + + @mock.patch("wqflask.marker_regression.gemma_mapping.webqtlConfig.GENERATED_IMAGE_DIR", "/home/user/img") + def test_parse_gemma_output_with_empty_return(self): + """add tests for parse gemma output where nothing returned""" + output_file_results = """chr\t today""" + with mock.patch("builtins.open", mock.mock_open(read_data=output_file_results)) as mock_open: + results = parse_gemma_output(genofile_name="gema_file") + self.assertEqual(results, []) + + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/tmp") + @mock.patch("wqflask.marker_regression.gemma_mapping.os") + @mock.patch("wqflask.marker_regression.gemma_mapping.json") + def test_parse_loco_outputfile_found(self, mock_json, mock_os): + """add tests for parse loco output file found""" + mock_json.load.return_value = { + "files": [["file_name", "user", "~/file1"], + ["file_name", "user", "~/file2"]] + } + return_file_1 = """X/Y\t L1\t21\tQ\tE\tA\tP\tMMB\tCDE\t0.5 +X/Y\tL2\t21322\tQ\tE\tA\tP\tMMB\tCDE\t0.5 +chr\tL3\t12312\tQ\tE\tA\tP\tMMB\tCDE\t0.7""" + return_file_2 = """chr\tother\t21322\tQ\tE\tA\tP\tMMB\tCDE\t0.5""" + mock_os.path.isfile.return_value = True + file_to_write = """{"files":["file_1","file_2"]}""" + with mock.patch("builtins.open") as mock_open: + + handles = (mock.mock_open(read_data="gwas").return_value, mock.mock_open( + read_data=return_file_1).return_value, mock.mock_open(read_data=return_file_2).return_value) + mock_open.side_effect = handles + results = parse_loco_output( + this_dataset={}, gwa_output_filename=".xw/") + expected_results = [{'name': ' L1', 'chr': 'X/Y', 'Mb': 2.1e-05, 'p_value': 0.5, 'lod_score': 0.3010299956639812}, { + 'name': 'L2', 'chr': 'X/Y', 'Mb': 0.021322, 'p_value': 0.5, 'lod_score': 0.3010299956639812}] + + self.assertEqual(expected_results, results) + + @mock.patch("wqflask.marker_regression.gemma_mapping.TEMPDIR", "/home/tmp") + @mock.patch("wqflask.marker_regression.gemma_mapping.os") + def test_parse_loco_outputfile_not_found(self, mock_os): + """add tests for parse loco output where output file not found""" + + mock_os.path.isfile.return_value = False + file_to_write = """{"files":["file_1","file_2"]}""" + + with mock.patch("builtins.open", mock.mock_open(read_data=file_to_write)) as mock_open: + results = parse_loco_output( + this_dataset={}, gwa_output_filename=".xw/") + self.assertEqual(results, []) diff --git a/wqflask/tests/wqflask/marker_regression/test_plink_mapping.py b/wqflask/tests/wqflask/marker_regression/test_plink_mapping.py new file mode 100644 index 00000000..428f45b9 --- /dev/null +++ b/wqflask/tests/wqflask/marker_regression/test_plink_mapping.py @@ -0,0 +1,84 @@ +# test for wqflask/marker_regression/plink_mapping.py +import unittest +from unittest import mock +from wqflask.marker_regression.plink_mapping import build_line_list +from wqflask.marker_regression.plink_mapping import get_samples_from_ped_file +from wqflask.marker_regression.plink_mapping import flat_files +from wqflask.marker_regression.plink_mapping import gen_pheno_txt_file_plink +from wqflask.marker_regression.plink_mapping import parse_plink_output + + +class AttributeSetter: + def __init__(self, obj): + for key, val in obj.items(): + setattr(self, key, val) +class TestPlinkMapping(unittest.TestCase): + + def test_build_line_list(self): + """testing for building line list""" + line_1 = "this is line one test" + irregular_line = " this is an, irregular line " + exp_line1 = ["this", "is", "line", "one", "test"] + + results = build_line_list(irregular_line) + self.assertEqual(exp_line1, build_line_list(line_1)) + self.assertEqual([], build_line_list()) + self.assertEqual(["this", "is", "an,", "irregular", "line"], results) + + @mock.patch("wqflask.marker_regression.plink_mapping.flat_files") + def test_get_samples_from_ped_file(self, mock_flat_files): + """test for getting samples from ped file""" + dataset = AttributeSetter({"group": AttributeSetter({"name": "n_1"})}) + file_sample = """Expected_1\tline test +Expected_2\there + Expected_3\tthree""" + mock_flat_files.return_value = "/home/user/" + with mock.patch("builtins.open", mock.mock_open(read_data=file_sample)) as mock_open: + results = get_samples_from_ped_file(dataset) + mock_flat_files.assert_called_once_with("mapping") + mock_open.assert_called_once_with("/home/user/n_1.ped", "r") + self.assertEqual( + ["Expected_1", "Expected_2", "Expected_3"], results) + + @mock.patch("wqflask.marker_regression.plink_mapping.TMPDIR", "/home/user/data/") + @mock.patch("wqflask.marker_regression.plink_mapping.get_samples_from_ped_file") + def test_gen_pheno_txt_file_plink(self, mock_samples): + """test for getting gen_pheno txt file""" + mock_samples.return_value = ["Expected_1", "Expected_2", "Expected_3"] + + trait = AttributeSetter({"name": "TX"}) + dataset = AttributeSetter({"group": AttributeSetter({"name": "n_1"})}) + vals = ["value=K1", "value=K2", "value=K3"] + with mock.patch("builtins.open", mock.mock_open()) as mock_open: + results = gen_pheno_txt_file_plink(this_trait=trait, dataset=dataset, + vals=vals, pheno_filename="ph_file") + mock_open.assert_called_once_with( + "/home/user/data/ph_file.txt", "wb") + filehandler = mock_open() + calls_expected = [mock.call('FID\tIID\tTX\n'), + mock.call('Expected_1\tExpected_1\tK1\nExpected_2\tExpected_2\tK2\nExpected_3\tExpected_3\tK3\n')] + + filehandler.write.assert_has_calls(calls_expected) + + filehandler.close.assert_called_once() + + @mock.patch("wqflask.marker_regression.plink_mapping.TMPDIR", "/home/user/data/") + @mock.patch("wqflask.marker_regression.plink_mapping.build_line_list") + def test_parse_plink_output(self, mock_line_list): + """test for parsing plink output""" + chromosomes = [0, 34, 110, 89, 123, 23, 2] + species = AttributeSetter( + {"name": "S1", "chromosomes": AttributeSetter({"chromosomes": chromosomes})}) + + fake_file = """0 AACCAT T98.6 0.89\n2 AATA B45 0.3\n121 ACG B56.4 NA""" + + mock_line_list.side_effect = [["0", "AACCAT", "T98.6", "0.89"], [ + "2", "AATA", "B45", "0.3"], ["121", "ACG", "B56.4", "NA"]] + with mock.patch("builtins.open", mock.mock_open(read_data=fake_file)) as mock_open: + parse_results = parse_plink_output( + output_filename="P1_file", species=species) + mock_open.assert_called_once_with( + "/home/user/data/P1_file.qassoc", "rb") + expected = (2, {'AACCAT': 0.89, 'AATA': 0.3}) + + self.assertEqual(parse_results, expected) diff --git a/wqflask/tests/wqflask/marker_regression/test_qtlreaper_mapping.py b/wqflask/tests/wqflask/marker_regression/test_qtlreaper_mapping.py new file mode 100644 index 00000000..b47f877a --- /dev/null +++ b/wqflask/tests/wqflask/marker_regression/test_qtlreaper_mapping.py @@ -0,0 +1,21 @@ +import unittest +from unittest import mock +from wqflask.marker_regression.qtlreaper_mapping import gen_pheno_txt_file + +#issues some methods in genofile object are not defined +#modify samples should equal to vals +class TestQtlReaperMapping(unittest.TestCase): + @mock.patch("wqflask.marker_regression.qtlreaper_mapping.TEMPDIR", "/home/user/data") + def test_gen_pheno_txt_file(self): + vals=["V1","x","V4","V3","x"] + samples=["S1","S2","S3","S4","S5"] + trait_filename="trait_file" + with mock.patch("builtins.open", mock.mock_open())as mock_open: + gen_pheno_txt_file(samples=samples,vals=vals,trait_filename=trait_filename) + mock_open.assert_called_once_with("/home/user/data/gn2/trait_file.txt","w") + filehandler=mock_open() + write_calls= [mock.call('Trait\t'),mock.call('S1\tS3\tS4\n'),mock.call('T1\t'),mock.call('V1\tV4\tV3')] + + filehandler.write.assert_has_calls(write_calls) + + diff --git a/wqflask/tests/wqflask/marker_regression/test_rqtl_mapping.py b/wqflask/tests/wqflask/marker_regression/test_rqtl_mapping.py new file mode 100644 index 00000000..69f53721 --- /dev/null +++ b/wqflask/tests/wqflask/marker_regression/test_rqtl_mapping.py @@ -0,0 +1,48 @@ +import unittest +from unittest import mock +from wqflask import app +from wqflask.marker_regression.rqtl_mapping import get_trait_data_type +from wqflask.marker_regression.rqtl_mapping import sanitize_rqtl_phenotype +from wqflask.marker_regression.rqtl_mapping import sanitize_rqtl_names + +class TestRqtlMapping(unittest.TestCase): + + def setUp(self): + self.app_context=app.app_context() + self.app_context.push() + + def tearDown(self): + self.app_context.pop() + + + @mock.patch("wqflask.marker_regression.rqtl_mapping.g") + @mock.patch("wqflask.marker_regression.rqtl_mapping.logger") + def test_get_trait_data(self,mock_logger,mock_db): + """test for getting trait data_type return True""" + caller_value="""SELECT value FROM TraitMetadata WHERE type='trait_data_type'""" + mock_db.db.execute.return_value.fetchone.return_value=["""{"type":"trait_data_type","name":"T1","traid_id":"fer434f"}"""] + results=get_trait_data_type("traid_id") + mock_db.db.execute.assert_called_with(caller_value) + self.assertEqual(results,"fer434f") + + def test_sanitize_rqtl_phenotype(self): + """test for sanitizing rqtl phenotype""" + vals=['f',"x","r","x","x"] + results=sanitize_rqtl_phenotype(vals) + expected_phenotype_string='c(f,NA,r,NA,NA)' + + self.assertEqual(results,expected_phenotype_string) + + def test_sanitize_rqtl_names(self): + """test for sanitzing rqtl names""" + vals=['f',"x","r","x","x"] + expected_sanitized_name="c('f',NA,'r',NA,NA)" + results=sanitize_rqtl_names(vals) + self.assertEqual(expected_sanitized_name,results) + + + + + + + diff --git a/wqflask/tests/wqflask/marker_regression/test_run_mapping.py b/wqflask/tests/wqflask/marker_regression/test_run_mapping.py new file mode 100644 index 00000000..a134f551 --- /dev/null +++ b/wqflask/tests/wqflask/marker_regression/test_run_mapping.py @@ -0,0 +1,284 @@ +import unittest +import datetime +from unittest import mock + +from wqflask.marker_regression.run_mapping import get_genofile_samplelist +from wqflask.marker_regression.run_mapping import geno_db_exists +from wqflask.marker_regression.run_mapping import write_input_for_browser +from wqflask.marker_regression.run_mapping import export_mapping_results +from wqflask.marker_regression.run_mapping import trim_markers_for_figure +from wqflask.marker_regression.run_mapping import get_perm_strata +from wqflask.marker_regression.run_mapping import get_chr_lengths + + +class AttributeSetter: + def __init__(self, obj): + for k, v in obj.items(): + setattr(self, k, v) + + +class MockDataSetGroup(AttributeSetter): + + def get_genofiles(self): + return [{"location": "~/genofiles/g1_file", "sample_list": ["S1", "S2", "S3", "S4"]}] + + +class TestRunMapping(unittest.TestCase): + def setUp(self): + + self.group = MockDataSetGroup( + {"genofile": "~/genofiles/g1_file", "name": "GP1_", "species": "Human"}) + chromosomes = { + "3": AttributeSetter({ + "name": "C1", + "length": "0.04" + }), + "4": AttributeSetter({ + "name": "C2", + "length": "0.03" + }), + "5": AttributeSetter({ + "name": "C4", + "length": "0.01" + }) + } + self.dataset = AttributeSetter( + {"fullname": "dataser_1", "group": self.group, "type": "ProbeSet"}) + + self.chromosomes = AttributeSetter({"chromosomes": chromosomes}) + self.trait = AttributeSetter( + {"symbol": "IGFI", "chr": "X1", "mb": 123313}) + + def tearDown(self): + self.dataset = AttributeSetter( + {"group": {"location": "~/genofiles/g1_file"}}) + + def test_get_genofile_samplelist(self): + + results_1 = get_genofile_samplelist(self.dataset) + self.assertEqual(results_1, ["S1", "S2", "S3", "S4"]) + self.group.genofile = "~/genofiles/g2_file" + result_2 = get_genofile_samplelist(self.dataset) + self.assertEqual(result_2, []) + + @mock.patch("wqflask.marker_regression.run_mapping.data_set") + def test_if_geno_db_exists(self, mock_data_set): + mock_data_set.create_dataset.side_effect = [ + AttributeSetter({}), Exception()] + results_no_error = geno_db_exists(self.dataset) + results_with_error = geno_db_exists(self.dataset) + + self.assertEqual(mock_data_set.create_dataset.call_count, 2) + self.assertEqual(results_with_error, "False") + self.assertEqual(results_no_error, "True") + + def test_trim_markers_for_figure(self): + + markers = [{ + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "lod_score":0.5 + }, + { + "name": "MK2", + "chr": "C2", + "cM": "15", + "Mb": "10000", + "genotypes": [], + "lod_score":0.7 + }, + { + "name": "MK1", + "chr": "C3", + "cM": "45", + "Mb": "1", + "genotypes": [], + "dominance":"Tt", + "additive":"VE", + "lod_score":1 + }] + + marker_2 = [{ + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "p_wald":4.6 + }] + results = trim_markers_for_figure(markers) + result_2 = trim_markers_for_figure(marker_2) + expected = [ + { + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "lod_score":0.5 + }, + { + "name": "MK1", + "chr": "C3", + "cM": "45", + "Mb": "1", + "genotypes": [], + "dominance":"Tt", + "additive":"VE", + "lod_score":1 + } + + ] + self.assertEqual(results, expected) + self.assertEqual(result_2, marker_2) + + def test_export_mapping_results(self): + """test for exporting mapping results""" + datetime_mock = mock.Mock(wraps=datetime.datetime) + datetime_mock.now.return_value = datetime.datetime( + 2019, 9, 1, 10, 12, 12) + + markers = [{ + "name": "MK1", + "chr": "C1", + "cM": "1", + "Mb": "12000", + "genotypes": [], + "dominance":"TT", + "additive":"VA", + "lod_score":3 + }, + { + "name": "MK2", + "chr": "C2", + "cM": "15", + "Mb": "10000", + "genotypes": [], + "lod_score":7 + }, + { + "name": "MK1", + "chr": "C3", + "cM": "45", + "Mb": "1", + "genotypes": [], + "dominance":"Tt", + "additive":"VE", + "lod_score":7 + }] + + with mock.patch("builtins.open", mock.mock_open()) as mock_open: + + with mock.patch("wqflask.marker_regression.run_mapping.datetime.datetime", new=datetime_mock): + export_mapping_results(dataset=self.dataset, trait=self.trait, markers=markers, + results_path="~/results", mapping_scale="physic", score_type="-log(p)") + + write_calls = [ + mock.call('Time/Date: 09/01/19 / 10:12:12\n'), + mock.call('Population: Human GP1_\n'), mock.call( + 'Data Set: dataser_1\n'), + mock.call('Gene Symbol: IGFI\n'), mock.call( + 'Location: X1 @ 123313 Mb\n'), + mock.call('\n'), mock.call('Name,Chr,'), + mock.call('Mb,-log(p)'), mock.call('Cm,-log(p)'), + mock.call(',Additive'), mock.call(',Dominance'), + mock.call('\n'), mock.call('MK1,C1,'), + mock.call('12000,'), mock.call('1,'), + mock.call('3'), mock.call(',VA'), + mock.call(',TT'), mock.call('\n'), + mock.call('MK2,C2,'), mock.call('10000,'), + mock.call('15,'), mock.call('7'), + mock.call('\n'), mock.call('MK1,C3,'), + mock.call('1,'), mock.call('45,'), + mock.call('7'), mock.call(',VE'), + mock.call(',Tt') + + ] + mock_open.assert_called_once_with("~/results", "w+") + filehandler = mock_open() + filehandler.write.assert_has_calls(write_calls) + + @mock.patch("wqflask.marker_regression.run_mapping.random.choice") + def test_write_input_for_browser(self, mock_choice): + """test for writing input for browser""" + mock_choice.side_effect = ["F", "i", "l", "e", "s", "x"] + with mock.patch("builtins.open", mock.mock_open()) as mock_open: + expected = ['GP1__Filesx_GWAS', 'GP1__Filesx_ANNOT'] + + results = write_input_for_browser( + this_dataset=self.dataset, gwas_results={}, annotations={}) + self.assertEqual(results, expected) + + def test_get_perm_strata(self): + categorical_vars = ["C1", "C2", "W1"] + used_samples = ["S1", "S2"] + sample_list = AttributeSetter({"sample_attribute_values": { + "S1": { + "C1": "c1_value", + "C2": "c2_value", + "W1": "w1_value" + + }, + "S2": { + "W1": "w2_value", + "W2": "w2_value" + + }, + "S3": { + + "C1": "c1_value", + "C2": "c2_value" + + }, + + }}) + + results = get_perm_strata(this_trait={}, sample_list=sample_list, + categorical_vars=categorical_vars, used_samples=used_samples) + self.assertEqual(results, [2, 1]) + + def test_get_chr_length(self): + """test for getting chromosome length""" + chromosomes = AttributeSetter({"chromosomes": self.chromosomes}) + dataset = AttributeSetter({"species": chromosomes}) + results = get_chr_lengths( + mapping_scale="physic", mapping_method="reaper", dataset=dataset, qtl_results=[]) + chr_lengths = [] + for key, chromo in self.chromosomes.chromosomes.items(): + chr_lengths.append({"chr": chromo.name, "size": chromo.length}) + + self.assertEqual(chr_lengths, results) + + qtl_results = [{ + "chr": "16", + "cM": "0.2" + }, + { + "chr": "12", + "cM": "0.5" + }, + { + "chr": "18", + "cM": "0.1" + }, + { + "chr": "22", + "cM": "0.4" + }, + ] + + result_with_other_mapping_scale = get_chr_lengths( + mapping_scale="other", mapping_method="reaper", dataset=dataset, qtl_results=qtl_results) + expected_value = [{'chr': '1', 'size': '0'}, { + 'chr': '16', 'size': '500000.0'}, {'chr': '18', 'size': '400000.0'}] + + self.assertEqual(result_with_other_mapping_scale, expected_value) diff --git a/wqflask/wqflask/marker_regression/gemma_mapping.py b/wqflask/wqflask/marker_regression/gemma_mapping.py index 68a8d5ba..02f91a32 100644 --- a/wqflask/wqflask/marker_regression/gemma_mapping.py +++ b/wqflask/wqflask/marker_regression/gemma_mapping.py @@ -31,16 +31,11 @@ def run_gemma(this_trait, this_dataset, samples, vals, covariates, use_loco, maf gwa_output_filename = this_dataset.group.name + "_GWA_" + ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) this_chromosomes = this_dataset.species.chromosomes.chromosomes - chr_list_string = "" - for i in range(len(this_chromosomes)): - if i < (len(this_chromosomes) - 1): - chr_list_string += this_chromosomes[i+1].name + "," - else: - chr_list_string += this_chromosomes[i+1].name + this_chromosomes_name=[chromosome.name for chromosome in this_chromosomes] + chr_list_string=",".join(this_chromosomes_name) if covariates != "": gen_covariates_file(this_dataset, covariates, samples) - if use_loco == "True": generate_k_command = GEMMA_WRAPPER_COMMAND + ' --json --loco ' + chr_list_string + ' -- ' + GEMMAOPTS + ' -g %s/%s_geno.txt -p %s/gn2/%s.txt -a %s/%s_snps.txt -gk > %s/gn2/%s.json' % (flat_files('genotype/bimbam'), genofile_name, diff --git a/wqflask/wqflask/marker_regression/plink_mapping.py b/wqflask/wqflask/marker_regression/plink_mapping.py index fd91b6ca..5d675c38 100644 --- a/wqflask/wqflask/marker_regression/plink_mapping.py +++ b/wqflask/wqflask/marker_regression/plink_mapping.py @@ -9,11 +9,11 @@ import utility.logger logger = utility.logger.getLogger(__name__ ) def run_plink(this_trait, dataset, species, vals, maf): - plink_output_filename = webqtlUtil.genRandStr("%s_%s_"%(dataset.group.name, this_trait.name)) + plink_output_filename = webqtlUtil.genRandStr(f"{dataset.group.name}_{this_trait.name}_") gen_pheno_txt_file(dataset, vals) - plink_command = PLINK_COMMAND + ' --noweb --bfile %s/%s --no-pheno --no-fid --no-parents --no-sex --maf %s --out %s%s --assoc ' % ( - flat_files('mapping'), dataset.group.name, maf, TMPDIR, plink_output_filename) + + plink_command = f"{PLINK_COMMAND} --noweb --bfile {flat_files('mapping')}/{dataset.group.name} --no-pheno --no-fid --no-parents --no-sex --maf {maf} --out { TMPDIR}{plink_output_filename} --assoc " logger.debug("plink_command:", plink_command) os.system(plink_command) @@ -29,12 +29,12 @@ def gen_pheno_txt_file(this_dataset, vals): """Generates phenotype file for GEMMA/PLINK""" current_file_data = [] - with open("{}/{}.fam".format(flat_files('mapping'), this_dataset.group.name), "r") as outfile: + with open(f"{flat_files('mapping')}/{this_dataset.group.name}.fam", "r") as outfile: for i, line in enumerate(outfile): split_line = line.split() current_file_data.append(split_line) - with open("{}/{}.fam".format(flat_files('mapping'), this_dataset.group.name), "w") as outfile: + with open(f"{flat_files('mapping')}/{this_dataset.group.name}.fam","w") as outfile: for i, line in enumerate(current_file_data): if vals[i] == "x": this_val = -9 @@ -44,8 +44,8 @@ def gen_pheno_txt_file(this_dataset, vals): def gen_pheno_txt_file_plink(this_trait, dataset, vals, pheno_filename = ''): ped_sample_list = get_samples_from_ped_file(dataset) - output_file = open("%s%s.txt" % (TMPDIR, pheno_filename), "wb") - header = 'FID\tIID\t%s\n' % this_trait.name + output_file = open(f"{TMPDIR}{pheno_filename}.txt", "wb") + header = f"FID\tIID\t{this_trait.name}\n" output_file.write(header) new_value_list = [] @@ -65,7 +65,7 @@ def gen_pheno_txt_file_plink(this_trait, dataset, vals, pheno_filename = ''): for i, sample in enumerate(ped_sample_list): j = i+1 value = new_value_list[i] - new_line += '%s\t%s\t%s\n'%(sample, sample, value) + new_line += f"{sample}\t{sample}\t{value}\n" if j%1000 == 0: output_file.write(newLine) @@ -78,7 +78,7 @@ def gen_pheno_txt_file_plink(this_trait, dataset, vals, pheno_filename = ''): # get strain name from ped file in order def get_samples_from_ped_file(dataset): - ped_file= open("{}{}.ped".format(flat_files('mapping'), dataset.group.name), "r") + ped_file= open(f"{flat_files('mapping')}{dataset.group.name}.ped","r") line = ped_file.readline() sample_list=[] @@ -98,7 +98,7 @@ def parse_plink_output(output_filename, species): threshold_p_value = 1 - result_fp = open("%s%s.qassoc"% (TMPDIR, output_filename), "rb") + result_fp = open(f"{TMPDIR}{output_filename}.qassoc","rb") line = result_fp.readline() @@ -154,7 +154,7 @@ def parse_plink_output(output_filename, species): # function: convert line from str to list; # output: lineList list ####################################################### -def build_line_list(line=None): +def build_line_list(line=""): line_list = line.strip().split(' ')# irregular number of whitespaces between columns line_list = [item for item in line_list if item !=''] line_list = [item.strip() for item in line_list] diff --git a/wqflask/wqflask/marker_regression/qtlreaper_mapping.py b/wqflask/wqflask/marker_regression/qtlreaper_mapping.py index 78b1f7b0..505ae295 100644 --- a/wqflask/wqflask/marker_regression/qtlreaper_mapping.py +++ b/wqflask/wqflask/marker_regression/qtlreaper_mapping.py @@ -17,22 +17,29 @@ def run_reaper(this_trait, this_dataset, samples, vals, json_data, num_perm, boo else: genofile_name = this_dataset.group.name - trait_filename = str(this_trait.name) + "_" + str(this_dataset.name) + "_pheno" + trait_filename =f"{str(this_trait.name)}_{str(this_dataset.name)}_pheno" gen_pheno_txt_file(samples, vals, trait_filename) - output_filename = this_dataset.group.name + "_GWA_" + ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + output_filename = (f"{this_dataset.group.name}_GWA_"+ + ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + ) bootstrap_filename = None permu_filename = None opt_list = [] if boot_check and num_bootstrap > 0: - bootstrap_filename = this_dataset.group.name + "_BOOTSTRAP_" + ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + bootstrap_filename = (f"{this_dataset.group.name}_BOOTSTRAP_" + + ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + ) opt_list.append("-b") - opt_list.append("--n_bootstrap " + str(num_bootstrap)) - opt_list.append("--bootstrap_output " + webqtlConfig.GENERATED_IMAGE_DIR + bootstrap_filename + ".txt") + opt_list.append(f"--n_bootstrap{str(num_bootstrap)}") + opt_list.append(f"--bootstrap_output{webqtlConfig.GENERATED_IMAGE_DIR}{bootstrap_filename}.txt") if num_perm > 0: - permu_filename = this_dataset.group.name + "_PERM_" + ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + permu_filename =("{this_dataset.group.name}_PERM_" + + ''.join(random.choice(string.ascii_uppercase + + string.digits) for _ in range(6)) + ) opt_list.append("-n " + str(num_perm)) opt_list.append("--permu_output " + webqtlConfig.GENERATED_IMAGE_DIR + permu_filename + ".txt") if control_marker != "" and do_control == "true": @@ -40,13 +47,15 @@ def run_reaper(this_trait, this_dataset, samples, vals, json_data, num_perm, boo if manhattan_plot != True: opt_list.append("--interval 1") - reaper_command = REAPER_COMMAND + ' --geno {0}/{1}.geno --traits {2}/gn2/{3}.txt {4} -o {5}{6}.txt'.format(flat_files('genotype'), - genofile_name, - TEMPDIR, - trait_filename, - " ".join(opt_list), - webqtlConfig.GENERATED_IMAGE_DIR, - output_filename) + reaper_command = (REAPER_COMMAND + + ' --geno {0}/{1}.geno --traits {2}/gn2/{3}.txt {4} -o {5}{6}.txt'.format(flat_files('genotype'), + + genofile_name, + TEMPDIR, + trait_filename, + " ".join(opt_list), + webqtlConfig.GENERATED_IMAGE_DIR, + output_filename)) logger.debug("reaper_command:" + reaper_command) os.system(reaper_command) @@ -61,12 +70,13 @@ def run_reaper(this_trait, this_dataset, samples, vals, json_data, num_perm, boo suggestive = permu_vals[int(num_perm*0.37-1)] significant = permu_vals[int(num_perm*0.95-1)] - return marker_obs, permu_vals, suggestive, significant, bootstrap_vals, [output_filename, permu_filename, bootstrap_filename] + return (marker_obs, permu_vals, suggestive, significant, bootstrap_vals, + [output_filename, permu_filename, bootstrap_filename]) def gen_pheno_txt_file(samples, vals, trait_filename): """Generates phenotype file for GEMMA""" - with open("{}/gn2/{}.txt".format(TEMPDIR, trait_filename), "w") as outfile: + with open(f"{TEMPDIR}/gn2/{trait_filename}.txt","w") as outfile: outfile.write("Trait\t") filtered_sample_list = [] @@ -90,7 +100,7 @@ def parse_reaper_output(gwa_filename, permu_filename, bootstrap_filename): only_cm = False only_mb = False - with open("{}{}.txt".format(webqtlConfig.GENERATED_IMAGE_DIR, gwa_filename)) as output_file: + with open(f"{webqtlConfig.GENERATED_IMAGE_DIR}{gwa_filename}.txt") as output_file: for line in output_file: if line.startswith("ID\t"): if len(line.split("\t")) < 8: @@ -137,13 +147,13 @@ def parse_reaper_output(gwa_filename, permu_filename, bootstrap_filename): permu_vals = [] if permu_filename: - with open("{}{}.txt".format(webqtlConfig.GENERATED_IMAGE_DIR, permu_filename)) as permu_file: + with open(f"{webqtlConfig.GENERATED_IMAGE_DIR}{permu_filename}.txt") as permu_file: for line in permu_file: permu_vals.append(float(line)) bootstrap_vals = [] if bootstrap_filename: - with open("{}{}.txt".format(webqtlConfig.GENERATED_IMAGE_DIR, bootstrap_filename)) as bootstrap_file: + with open(f"{webqtlConfig.GENERATED_IMAGE_DIR}{bootstrap_filename}.txt") as bootstrap_file: for line in bootstrap_file: bootstrap_vals.append(int(line)) |