aboutsummaryrefslogtreecommitdiff
import unittest
import datetime
from unittest import mock

import gn2.wqflask.marker_regression.run_mapping as run_mapping
from gn2.wqflask.marker_regression.run_mapping import get_genofile_samplelist
from gn2.wqflask.marker_regression.run_mapping import geno_db_exists
from gn2.wqflask.marker_regression.run_mapping import write_input_for_browser
from gn2.wqflask.marker_regression.run_mapping import export_mapping_results
from gn2.wqflask.marker_regression.run_mapping import trim_markers_for_figure
from gn2.wqflask.marker_regression.run_mapping import get_perm_strata
from gn2.wqflask.marker_regression.run_mapping import get_chr_lengths


class AttributeSetter:
    def __init__(self, obj):
        for k, v in obj.items():
            setattr(self, k, v)


class MockGroup(AttributeSetter):

    def get_genofiles(self):
        return [{"location": "~/genofiles/g1_file", "sample_list": ["S1", "S2", "S3", "S4"]}]


class TestRunMapping(unittest.TestCase):
    def setUp(self):

        self.group = MockGroup(
            {"genofile": "~/genofiles/g1_file", "name": "GP1_", "species": "Human"})
        chromosomes = {
            "3": AttributeSetter({
                "name": "C1",
                "length": "0.04"
            }),
            "4": AttributeSetter({
                "name": "C2",
                "length": "0.03"
            }),
            "5": AttributeSetter({
                "name": "C4",
                "length": "0.01"
            })
        }
        self.dataset = AttributeSetter(
            {"fullname": "dataset_1", "group": self.group, "type": "ProbeSet"})

        self.chromosomes = AttributeSetter({"chromosomes": lambda cur: chromosomes})
        self.trait = AttributeSetter(
            {"symbol": "IGFI", "chr": "X1", "mb": 123313, "display_name": "Test Name"})

    def tearDown(self):
        self.dataset = AttributeSetter(
            {"group": {"location": "~/genofiles/g1_file"}})

    def test_get_genofile_samplelist(self):

        results_1 = get_genofile_samplelist(self.dataset)
        self.assertEqual(results_1, ["S1", "S2", "S3", "S4"])
        self.group.genofile = "~/genofiles/g2_file"
        result_2 = get_genofile_samplelist(self.dataset)
        self.assertEqual(result_2, [])

    @mock.patch("gn2.wqflask.marker_regression.run_mapping.data_set")
    def test_if_geno_db_exists(self, mock_data_set):
        mock_data_set.create_dataset.side_effect = [
            AttributeSetter({}), Exception()]
        run_mapping.create_trait = mock.Mock()
        results_no_error = geno_db_exists(self.dataset, "dataset_1")
        results_with_error = geno_db_exists(self.dataset, "dataset_1")

        self.assertEqual(mock_data_set.create_dataset.call_count, 2)
        self.assertEqual(results_with_error, "False")
        self.assertEqual(results_no_error, "True")

    def test_trim_markers_for_figure(self):

        markers = [{
            "name": "MK1",
            "chr": "C1",
            "cM": "1",
            "Mb": "12000",
            "genotypes": [],
            "dominance":"TT",
            "additive":"VA",
            "lod_score":0.5
        },
            {
            "name": "MK2",
            "chr": "C2",
            "cM": "15",
            "Mb": "10000",
            "genotypes": [],
            "lod_score":0.7
        },
            {
            "name": "MK1",
            "chr": "C3",
            "cM": "45",
            "Mb": "1",
            "genotypes": [],
            "dominance":"Tt",
            "additive":"VE",
            "lod_score":1
        }]

        marker_2 = [{
            "name": "MK1",
            "chr": "C1",
            "cM": "1",
            "Mb": "12000",
            "genotypes": [],
            "dominance":"TT",
            "additive":"VA",
            "p_wald":4.6
        }]
        results = trim_markers_for_figure(markers)
        result_2 = trim_markers_for_figure(marker_2)
        expected = [
            {
                "name": "MK1",
                "chr": "C1",
                "cM": "1",
                "Mb": "12000",
                "genotypes": [],
                "dominance":"TT",
                "additive":"VA",
                "lod_score":0.5
            },
            {
                "name": "MK1",
                "chr": "C3",
                "cM": "45",
                "Mb": "1",
                "genotypes": [],
                "dominance":"Tt",
                "additive":"VE",
                "lod_score":1
            }

        ]
        self.assertEqual(results, expected)
        self.assertEqual(result_2, marker_2)

    def test_export_mapping_results(self):
        """test for exporting mapping results"""
        datetime_mock = mock.Mock(wraps=datetime.datetime)
        datetime_mock.now.return_value = datetime.datetime(
            2019, 9, 1, 10, 12, 12)

        markers = [{
            "name": "MK1",
            "chr": "C1",
            "cM": "1",
            "Mb": "12000",
            "genotypes": [],
            "dominance":"TT",
            "additive":"VA",
            "lod_score":3
        },
            {
            "name": "MK2",
            "chr": "C2",
            "cM": "15",
            "Mb": "10000",
            "genotypes": [],
            "lod_score":7
        },
            {
            "name": "MK1",
            "chr": "C3",
            "cM": "45",
            "Mb": "1",
            "genotypes": [],
            "dominance":"Tt",
            "additive":"VE",
            "lod_score":7
        }]

        with mock.patch("builtins.open", mock.mock_open()) as mock_open:

            with mock.patch("gn2.wqflask.marker_regression.run_mapping.datetime.datetime", new=datetime_mock):
                export_mapping_results(dataset=self.dataset, trait=self.trait, markers=markers,
                                       results_path="~/results", mapping_method="gemma", mapping_scale="physic",
                                       score_type="-logP", transform="qnorm",
                                       covariates="Dataset1:Trait1,Dataset2:Trait2",
                                       n_samples="100", vals_hash="")

                write_calls = [
                    mock.call('Time/Date: 09/01/19 / 10:12:12\n'),
                    mock.call('Population: Human GP1_\n'), mock.call(
                        'Data Set: dataset_1\n'),
                    mock.call('Trait: Test Name\n'),
                    mock.call('Trait Hash: \n'),
                    mock.call('N Samples: 100\n'),
                    mock.call('Mapping Tool: gemma\n'),
                    mock.call('Transform - Quantile Normalized\n'),
                    mock.call('Gene Symbol: IGFI\n'), mock.call(
                        'Location: X1 @ 123313 Mb\n'),
                    mock.call('Cofactors (dataset - trait):\n'),
                    mock.call('Trait1 - Dataset1\n'),
                    mock.call('Trait2 - Dataset2\n'),
                    mock.call('\n'), mock.call('Name,Chr,'),
                    mock.call('Mb,-logP'),
                    mock.call(',Additive'), mock.call(',Dominance'),
                    mock.call('\n'), mock.call('MK1,C1,'),
                    mock.call('12000,'), mock.call('3'),
                    mock.call(',VA'), mock.call(',TT'),
                    mock.call('\n'), mock.call('MK2,C2,'),
                    mock.call('10000,'), mock.call('7'),
                    mock.call('\n'), mock.call('MK1,C3,'),
                    mock.call('1,'), mock.call('7'),
                    mock.call(',VE'), mock.call(',Tt')
                ]
                mock_open.assert_called_once_with("~/results", "w+")
                filehandler = mock_open()
                filehandler.write.assert_has_calls(write_calls)

    @mock.patch("gn2.wqflask.marker_regression.run_mapping.random.choice")
    def test_write_input_for_browser(self, mock_choice):
        """test for writing input for browser"""
        mock_choice.side_effect = ["F", "i", "l", "e", "s", "x"]
        with mock.patch("builtins.open", mock.mock_open()) as mock_open:
            expected = ['GP1__Filesx_GWAS', 'GP1__Filesx_ANNOT']

            results = write_input_for_browser(
                this_dataset=self.dataset, gwas_results={}, annotations={})
            self.assertEqual(results, expected)

    def test_get_perm_strata(self):
        categorical_vars = ["C1", "C2", "W1"]
        used_samples = ["S1", "S2"]
        sample_list = AttributeSetter({"sample_attribute_values": {
            "S1": {
                "c1": "c1_value",
                "c2": "c2_value",
                "w1": "w1_value"
            },
            "S2": {
                "w1": "w2_value",
                "w2": "w2_value"
            },
            "S3": {

                "c1": "c1_value",
                "c2": "c2_value"
            },
        }})
        results = get_perm_strata(this_trait={}, sample_list=sample_list,
                                  categorical_vars=categorical_vars, used_samples=used_samples)
        self.assertEqual(results, [1, 1])

    def test_get_chr_length(self):
        """test for getting chromosome length"""
        cursor = mock.MagicMock()
        chromosomes = AttributeSetter({"chromosomes": self.chromosomes})
        dataset = AttributeSetter({"species": chromosomes})
        results = get_chr_lengths(
            mapping_scale="physic", mapping_method="reaper", dataset=dataset, qtl_results=[])
        chr_lengths = []
        for key, chromo in self.chromosomes.chromosomes(cursor).items():
            chr_lengths.append({"chr": chromo.name, "size": chromo.length})

        self.assertEqual(chr_lengths, results)

        qtl_results = [{
            "chr": "16",
            "cM": "0.2"
        },
            {
            "chr": "12",
            "cM": "0.5"
        },
            {
            "chr": "18",
            "cM": "0.1"
        },
            {
            "chr": "22",
            "cM": "0.4"
        },
        ]

        result_with_other_mapping_scale = get_chr_lengths(
            mapping_scale="other", mapping_method="reaper", dataset=dataset, qtl_results=qtl_results)
        expected_value = [{'chr': '1', 'size': '0'}, {
            'chr': '16', 'size': '500000.0'}, {'chr': '18', 'size': '400000.0'}]

        self.assertEqual(result_with_other_mapping_scale, expected_value)