diff options
author | Alexander Kabui | 2022-03-15 14:08:39 +0300 |
---|---|---|
committer | GitHub | 2022-03-15 14:08:39 +0300 |
commit | 16367dab9248d3aa2660e0b5cafdce25e8f7067c (patch) | |
tree | 1d08f861dbd7302ff43489fc338c99fa3cf92e6f | |
parent | 4b85316c6ca5355faf6ac85f755744f8d52e49d3 (diff) | |
download | genenetwork3-16367dab9248d3aa2660e0b5cafdce25e8f7067c.tar.gz |
Feature/refactored pca (#79)
* compute zscore function
* test case for computing zscore
* function to compute pca
* generate scree plot data
* generate new pca trait data from zscores and eigen_vec
* remove redundant functions
* generate factor loading table data
* generate pca temp dataset dict
* variable naming and error fixes
* unit test for processing factor loadings
* minor fixes for generating temp pca dataset
* pass datetime as argument to generate_pca temp dataset function
* add unittest for caching pca datasets
* cache temp datasets
* ignore missing imports for sklearn
* mypy fixes
* pylint fixes
* refactor tests for pca
* remove ununsed imports
* fix for generating pca traits vals
* mypy and code refactoring
* pep8 formatting and add docstrings
* remove comments /pep8 formatting
* sort eigen vectors based on eigen values
* minor fix for zscores
* fix for rounding variance ratios
* refactor tests
* rename module to pca
* rename datasets to traits
* fix failing tests
* fix caching function
* fixes return x and y coordinates for scree plot
* expand exception scope
* fix for deprecated numpy.matrix function
* fix for failing tests
* pep8 fixes
* remove comments
* fix merge conflict
* pylint fixes
* rename module name to test_pca
-rw-r--r-- | gn3/computations/pca.py | 189 | ||||
-rw-r--r-- | mypy.ini | 3 | ||||
-rw-r--r-- | tests/unit/computations/test_pca.py | 101 |
3 files changed, 293 insertions, 0 deletions
diff --git a/gn3/computations/pca.py b/gn3/computations/pca.py new file mode 100644 index 0000000..35c9f03 --- /dev/null +++ b/gn3/computations/pca.py @@ -0,0 +1,189 @@ +"""module contains pca implementation using python""" + + +from typing import Any +from scipy import stats + +from sklearn.decomposition import PCA +from sklearn import preprocessing + +import numpy as np +import redis + + +from typing_extensions import TypeAlias + +fArray: TypeAlias = list[float] + + +def compute_pca(array: list[fArray]) -> dict[str, Any]: + """ + computes the principal component analysis + + Parameters: + + array(list[list]):a list of lists contains data to perform pca + + + Returns: + pca_dict(dict):dict contains the pca_object,pca components,pca scores + + + """ + + corr_matrix = np.array(array) + + pca_obj = PCA() + scaled_data = preprocessing.scale(corr_matrix) + + pca_obj.fit(scaled_data) + + return { + "pca": pca_obj, + "components": pca_obj.components_, + "scores": pca_obj.transform(scaled_data) + } + + +def generate_scree_plot_data(variance_ratio: fArray) -> tuple[list, fArray]: + """ + generates the scree data for plotting + + Parameters: + + variance_ratio(list[floats]):ratios for contribution of each pca + + Returns: + + coordinates(list[(x_coor,y_coord)]) + + + """ + + perc_var = [round(ratio*100, 1) for ratio in variance_ratio] + + x_coordinates = [f"PC{val}" for val in range(1, len(perc_var)+1)] + + return (x_coordinates, perc_var) + + +def generate_pca_traits_vals(trait_data_array: list[fArray], + corr_array: list[fArray]) -> list[list[Any]]: + """ + generates datasets from zscores of the traits and eigen_vectors\ + of correlation matrix + + Parameters: + + trait_data_array(list[floats]):an list of the traits + corr_array(list[list]): list of arrays for computing eigen_vectors + + Returns: + + pca_vals[list[list]]: + + + """ + + trait_zscores = stats.zscore(trait_data_array) + + if len(trait_data_array[0]) < 10: + trait_zscores = trait_data_array + + (eigen_values, corr_eigen_vectors) = np.linalg.eig(np.array(corr_array)) + idx = eigen_values.argsort()[::-1] + + return np.dot(corr_eigen_vectors[:, idx], trait_zscores) + + +def process_factor_loadings_tdata(factor_loadings, traits_num: int): + """ + + transform loadings for tables visualization + + Parameters: + factor_loading(numpy.ndarray) + traits_num(int):number of traits + + Returns: + tabular_loadings(list[list[float]]) + """ + + target_columns = 3 if traits_num > 2 else 2 + + trait_loadings = list(factor_loadings.T) + + return [list(trait_loading[:target_columns]) + for trait_loading in trait_loadings] + + +def generate_pca_temp_traits( + species: str, + group: str, + traits_data: list[fArray], + corr_array: list[fArray], + dataset_samples: list[str], + shared_samples: list[str], + create_time: str +) -> dict[str, list[Any]]: + """ + + + generate pca temp datasets + + """ + + # pylint: disable=too-many-arguments + + pca_trait_dict = {} + + pca_vals = generate_pca_traits_vals(traits_data, corr_array) + + for (idx, pca_trait) in enumerate(list(pca_vals)): + + trait_id = f"PCA{str(idx+1)}_{species}_{group}_{create_time}" + sample_vals = [] + + pointer = 0 + + for sample in dataset_samples: + if sample in shared_samples: + + sample_vals.append(str(pca_trait[pointer])) + pointer += 1 + + else: + sample_vals.append("x") + + pca_trait_dict[trait_id] = sample_vals + + return pca_trait_dict + + +def cache_pca_dataset(redis_conn: Any, exp_days: int, + pca_trait_dict: dict[str, list[Any]]): + """ + + caches pca dataset to redis + + Parameters: + + redis_conn(object) + exp_days(int): fo redis cache + pca_trait_dict(Dict): contains traits and traits vals to cache + + Returns: + + boolean(True if correct conn object False incase of exception) + + + """ + + try: + for trait_id, sample_data in pca_trait_dict.items(): + samples_str = " ".join([str(x) for x in sample_data]) + redis_conn.set(trait_id, samples_str, ex=exp_days) + return True + + except (redis.ConnectionError, AttributeError): + return False @@ -34,4 +34,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pytest.*] +ignore_missing_imports = True + +[mypy-sklearn.*] ignore_missing_imports = True
\ No newline at end of file diff --git a/tests/unit/computations/test_pca.py b/tests/unit/computations/test_pca.py new file mode 100644 index 0000000..0189948 --- /dev/null +++ b/tests/unit/computations/test_pca.py @@ -0,0 +1,101 @@ +"""module contains unittests for pca""" +import unittest +from unittest.mock import patch +from unittest.mock import Mock + +import numpy as np +import pytest + +from gn3.computations.pca import cache_pca_dataset +from gn3.computations.pca import generate_pca_temp_traits +from gn3.computations.pca import generate_scree_plot_data +from gn3.computations.pca import process_factor_loadings_tdata + + +class TestPCA(unittest.TestCase): + """pca testcase class""" + + @pytest.mark.unit_test + def test_process_factor_loadings(self): + """test for processing factor loadings""" + + test_array = np.array([ + [-0.23511749, -0.61483617, -0.26872797, 0.70319381], + [-0.71057342, 0.4623377, -0.52921008, -0.0355803], + [-0.60977093, -0.02877103, 0.78874096, 0.07238328], + [0.26073856, 0.63827311, 0.16003023, 0.70640864] + ]) + + expected_results = [[-0.23511749, -0.71057342, -0.60977093], + [-0.61483617, 0.4623377, -0.02877103], + [-0.26872797, -0.52921008, 0.78874096], + [0.70319381, -0.0355803, 0.07238328]] + + self.assertEqual(process_factor_loadings_tdata( + test_array, 3), expected_results) + + @pytest.mark.unit_test + @patch("gn3.computations.pca.generate_pca_traits_vals") + def test_generate_pca_datasets(self, mock_pca_data): + """test for generating temp pca dataset""" + + mock_pca_data.return_value = np.array([[21, 10, 17, 15, 13], + [21, 11, 18, + 9, 1], + [22, 16, 0, + 0.22667229, -1], + [31, 12, 10, 17, 11]]) + + shared_samples = ["BXD1", "BXD2", "BXD", "BXD4", "Unkown"] + + dataset_samples = ["BXD1", "BXD5", "BXD4", "BXD"] + expected_results = { + "PCA1_mouse_G1_now": ["21.0", "x", "10.0", "17.0"], + "PCA2_mouse_G1_now": ["21.0", "x", "11.0", "18.0"], + "PCA3_mouse_G1_now": ["22.0", "x", "16.0", "0.0"], + "PCA4_mouse_G1_now": ["31.0", "x", "12.0", "10.0"] + } + + results = generate_pca_temp_traits(species="mouse", group="G1", + traits_data=[], + dataset_samples=dataset_samples, + corr_array=[], + shared_samples=shared_samples, + create_time="now") + + self.assertEqual(results, expected_results) + + @pytest.mark.unit_test + def test_generate_scree_plot(self): + """test scree plot data is generated""" + + variance = [0.9271, 0.06232, 0.031] + + self.assertEqual(generate_scree_plot_data(variance), + (['PC1', 'PC2', 'PC3'], [92.7, 6.2, 3.1])) + + @pytest.mark.unit_test + def test_cache_pca_datasets(self): + """test for caching pca datasets""" + + pca_traits = { + "PCA_1": ["11.0", "x", "9.0", "7.0"], + "PCA_2": ["x", "x", "1.2", "3.1"] + } + + self.assertEqual(cache_pca_dataset(redis_conn={}, exp_days=30, + pca_trait_dict=pca_traits), False) + + mock_redis = Mock() + mock_redis.set.return_value = True + + test_data = [({}, 30, pca_traits, False), + (mock_redis, 30, pca_traits, True)] + + for (test_redis, exp_day, test_traits, expected) in test_data: + + with self.subTest(redis_conn=test_redis, + exp_days=exp_day, pca_trait_dict=test_traits): + + self.assertEqual(cache_pca_dataset( + test_redis, exp_day, test_traits), expected) |