import unittest import datetime from unittest import mock import gn2.wqflask.marker_regression.run_mapping as run_mapping from gn2.wqflask.marker_regression.run_mapping import get_genofile_samplelist from gn2.wqflask.marker_regression.run_mapping import geno_db_exists from gn2.wqflask.marker_regression.run_mapping import write_input_for_browser from gn2.wqflask.marker_regression.run_mapping import export_mapping_results from gn2.wqflask.marker_regression.run_mapping import trim_markers_for_figure from gn2.wqflask.marker_regression.run_mapping import get_perm_strata from gn2.wqflask.marker_regression.run_mapping import get_chr_lengths class AttributeSetter: def __init__(self, obj): for k, v in obj.items(): setattr(self, k, v) class MockGroup(AttributeSetter): def get_genofiles(self): return [{"location": "~/genofiles/g1_file", "sample_list": ["S1", "S2", "S3", "S4"]}] class TestRunMapping(unittest.TestCase): def setUp(self): self.group = MockGroup( {"genofile": "~/genofiles/g1_file", "name": "GP1_", "species": "Human"}) chromosomes = { "3": AttributeSetter({ "name": "C1", "length": "0.04" }), "4": AttributeSetter({ "name": "C2", "length": "0.03" }), "5": AttributeSetter({ "name": "C4", "length": "0.01" }) } self.dataset = AttributeSetter( {"fullname": "dataset_1", "group": self.group, "type": "ProbeSet"}) self.chromosomes = AttributeSetter({"chromosomes": lambda cur: chromosomes}) self.trait = AttributeSetter( {"symbol": "IGFI", "chr": "X1", "mb": 123313, "display_name": "Test Name"}) def tearDown(self): self.dataset = AttributeSetter( {"group": {"location": "~/genofiles/g1_file"}}) def test_get_genofile_samplelist(self): results_1 = get_genofile_samplelist(self.dataset) self.assertEqual(results_1, ["S1", "S2", "S3", "S4"]) self.group.genofile = "~/genofiles/g2_file" result_2 = get_genofile_samplelist(self.dataset) self.assertEqual(result_2, []) @mock.patch("gn2.wqflask.marker_regression.run_mapping.data_set") def test_if_geno_db_exists(self, mock_data_set): mock_data_set.create_dataset.side_effect = [ AttributeSetter({}), Exception()] run_mapping.create_trait = mock.Mock() results_no_error = geno_db_exists(self.dataset, "dataset_1") results_with_error = geno_db_exists(self.dataset, "dataset_1") self.assertEqual(mock_data_set.create_dataset.call_count, 2) self.assertEqual(results_with_error, "False") self.assertEqual(results_no_error, "True") def test_trim_markers_for_figure(self): markers = [{ "name": "MK1", "chr": "C1", "cM": "1", "Mb": "12000", "genotypes": [], "dominance":"TT", "additive":"VA", "lod_score":0.5 }, { "name": "MK2", "chr": "C2", "cM": "15", "Mb": "10000", "genotypes": [], "lod_score":0.7 }, { "name": "MK1", "chr": "C3", "cM": "45", "Mb": "1", "genotypes": [], "dominance":"Tt", "additive":"VE", "lod_score":1 }] marker_2 = [{ "name": "MK1", "chr": "C1", "cM": "1", "Mb": "12000", "genotypes": [], "dominance":"TT", "additive":"VA", "p_wald":4.6 }] results = trim_markers_for_figure(markers) result_2 = trim_markers_for_figure(marker_2) expected = [ { "name": "MK1", "chr": "C1", "cM": "1", "Mb": "12000", "genotypes": [], "dominance":"TT", "additive":"VA", "lod_score":0.5 }, { "name": "MK1", "chr": "C3", "cM": "45", "Mb": "1", "genotypes": [], "dominance":"Tt", "additive":"VE", "lod_score":1 } ] self.assertEqual(results, expected) self.assertEqual(result_2, marker_2) def test_export_mapping_results(self): """test for exporting mapping results""" datetime_mock = mock.Mock(wraps=datetime.datetime) datetime_mock.now.return_value = datetime.datetime( 2019, 9, 1, 10, 12, 12) markers = [{ "name": "MK1", "chr": "C1", "cM": "1", "Mb": "12000", "genotypes": [], "dominance":"TT", "additive":"VA", "lod_score":3 }, { "name": "MK2", "chr": "C2", "cM": "15", "Mb": "10000", "genotypes": [], "lod_score":7 }, { "name": "MK1", "chr": "C3", "cM": "45", "Mb": "1", "genotypes": [], "dominance":"Tt", "additive":"VE", "lod_score":7 }] with mock.patch("builtins.open", mock.mock_open()) as mock_open: with mock.patch("gn2.wqflask.marker_regression.run_mapping.datetime.datetime", new=datetime_mock): export_mapping_results(dataset=self.dataset, trait=self.trait, markers=markers, results_path="~/results", mapping_method="gemma", mapping_scale="physic", score_type="-logP", transform="qnorm", covariates="Dataset1:Trait1,Dataset2:Trait2", n_samples="100", vals_hash="") write_calls = [ mock.call('Time/Date: 09/01/19 / 10:12:12\n'), mock.call('Population: Human GP1_\n'), mock.call( 'Data Set: dataset_1\n'), mock.call('Trait: Test Name\n'), mock.call('Trait Hash: \n'), mock.call('N Samples: 100\n'), mock.call('Mapping Tool: gemma\n'), mock.call('Transform - Quantile Normalized\n'), mock.call('Gene Symbol: IGFI\n'), mock.call( 'Location: X1 @ 123313 Mb\n'), mock.call('Cofactors (dataset - trait):\n'), mock.call('Trait1 - Dataset1\n'), mock.call('Trait2 - Dataset2\n'), mock.call('\n'), mock.call('Name,Chr,'), mock.call('Mb,-logP'), mock.call(',Additive'), mock.call(',Dominance'), mock.call('\n'), mock.call('MK1,C1,'), mock.call('12000,'), mock.call('3'), mock.call(',VA'), mock.call(',TT'), mock.call('\n'), mock.call('MK2,C2,'), mock.call('10000,'), mock.call('7'), mock.call('\n'), mock.call('MK1,C3,'), mock.call('1,'), mock.call('7'), mock.call(',VE'), mock.call(',Tt') ] mock_open.assert_called_once_with("~/results", "w+") filehandler = mock_open() filehandler.write.assert_has_calls(write_calls) @mock.patch("gn2.wqflask.marker_regression.run_mapping.random.choice") def test_write_input_for_browser(self, mock_choice): """test for writing input for browser""" mock_choice.side_effect = ["F", "i", "l", "e", "s", "x"] with mock.patch("builtins.open", mock.mock_open()) as mock_open: expected = ['GP1__Filesx_GWAS', 'GP1__Filesx_ANNOT'] results = write_input_for_browser( this_dataset=self.dataset, gwas_results={}, annotations={}) self.assertEqual(results, expected) def test_get_perm_strata(self): categorical_vars = ["C1", "C2", "W1"] used_samples = ["S1", "S2"] sample_list = AttributeSetter({"sample_attribute_values": { "S1": { "c1": "c1_value", "c2": "c2_value", "w1": "w1_value" }, "S2": { "w1": "w2_value", "w2": "w2_value" }, "S3": { "c1": "c1_value", "c2": "c2_value" }, }}) results = get_perm_strata(this_trait={}, sample_list=sample_list, categorical_vars=categorical_vars, used_samples=used_samples) self.assertEqual(results, [1, 1]) def test_get_chr_length(self): """test for getting chromosome length""" cursor = mock.MagicMock() chromosomes = AttributeSetter({"chromosomes": self.chromosomes}) dataset = AttributeSetter({"species": chromosomes}) results = get_chr_lengths( mapping_scale="physic", mapping_method="reaper", dataset=dataset, qtl_results=[]) chr_lengths = [] for key, chromo in self.chromosomes.chromosomes(cursor).items(): chr_lengths.append({"chr": chromo.name, "size": chromo.length}) self.assertEqual(chr_lengths, results) qtl_results = [{ "chr": "16", "cM": "0.2" }, { "chr": "12", "cM": "0.5" }, { "chr": "18", "cM": "0.1" }, { "chr": "22", "cM": "0.4" }, ] result_with_other_mapping_scale = get_chr_lengths( mapping_scale="other", mapping_method="reaper", dataset=dataset, qtl_results=qtl_results) expected_value = [{'chr': '1', 'size': '0'}, { 'chr': '16', 'size': '500000.0'}, {'chr': '18', 'size': '400000.0'}] self.assertEqual(result_with_other_mapping_scale, expected_value)