From 16367dab9248d3aa2660e0b5cafdce25e8f7067c Mon Sep 17 00:00:00 2001 From: Alexander Kabui Date: Tue, 15 Mar 2022 14:08:39 +0300 Subject: Feature/refactored pca (#79) * compute zscore function * test case for computing zscore * function to compute pca * generate scree plot data * generate new pca trait data from zscores and eigen_vec * remove redundant functions * generate factor loading table data * generate pca temp dataset dict * variable naming and error fixes * unit test for processing factor loadings * minor fixes for generating temp pca dataset * pass datetime as argument to generate_pca temp dataset function * add unittest for caching pca datasets * cache temp datasets * ignore missing imports for sklearn * mypy fixes * pylint fixes * refactor tests for pca * remove ununsed imports * fix for generating pca traits vals * mypy and code refactoring * pep8 formatting and add docstrings * remove comments /pep8 formatting * sort eigen vectors based on eigen values * minor fix for zscores * fix for rounding variance ratios * refactor tests * rename module to pca * rename datasets to traits * fix failing tests * fix caching function * fixes return x and y coordinates for scree plot * expand exception scope * fix for deprecated numpy.matrix function * fix for failing tests * pep8 fixes * remove comments * fix merge conflict * pylint fixes * rename module name to test_pca--- gn3/computations/pca.py | 189 ++++++++++++++++++++++++++++++++++++ mypy.ini | 3 + tests/unit/computations/test_pca.py | 101 +++++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 gn3/computations/pca.py create mode 100644 tests/unit/computations/test_pca.py diff --git a/gn3/computations/pca.py b/gn3/computations/pca.py new file mode 100644 index 0000000..35c9f03 --- /dev/null +++ b/gn3/computations/pca.py @@ -0,0 +1,189 @@ +"""module contains pca implementation using python""" + + +from typing import Any +from scipy import stats + +from sklearn.decomposition import PCA +from sklearn import preprocessing + +import numpy as np +import redis + + +from typing_extensions import TypeAlias + +fArray: TypeAlias = list[float] + + +def compute_pca(array: list[fArray]) -> dict[str, Any]: + """ + computes the principal component analysis + + Parameters: + + array(list[list]):a list of lists contains data to perform pca + + + Returns: + pca_dict(dict):dict contains the pca_object,pca components,pca scores + + + """ + + corr_matrix = np.array(array) + + pca_obj = PCA() + scaled_data = preprocessing.scale(corr_matrix) + + pca_obj.fit(scaled_data) + + return { + "pca": pca_obj, + "components": pca_obj.components_, + "scores": pca_obj.transform(scaled_data) + } + + +def generate_scree_plot_data(variance_ratio: fArray) -> tuple[list, fArray]: + """ + generates the scree data for plotting + + Parameters: + + variance_ratio(list[floats]):ratios for contribution of each pca + + Returns: + + coordinates(list[(x_coor,y_coord)]) + + + """ + + perc_var = [round(ratio*100, 1) for ratio in variance_ratio] + + x_coordinates = [f"PC{val}" for val in range(1, len(perc_var)+1)] + + return (x_coordinates, perc_var) + + +def generate_pca_traits_vals(trait_data_array: list[fArray], + corr_array: list[fArray]) -> list[list[Any]]: + """ + generates datasets from zscores of the traits and eigen_vectors\ + of correlation matrix + + Parameters: + + trait_data_array(list[floats]):an list of the traits + corr_array(list[list]): list of arrays for computing eigen_vectors + + Returns: + + pca_vals[list[list]]: + + + """ + + trait_zscores = stats.zscore(trait_data_array) + + if len(trait_data_array[0]) < 10: + trait_zscores = trait_data_array + + (eigen_values, corr_eigen_vectors) = np.linalg.eig(np.array(corr_array)) + idx = eigen_values.argsort()[::-1] + + return np.dot(corr_eigen_vectors[:, idx], trait_zscores) + + +def process_factor_loadings_tdata(factor_loadings, traits_num: int): + """ + + transform loadings for tables visualization + + Parameters: + factor_loading(numpy.ndarray) + traits_num(int):number of traits + + Returns: + tabular_loadings(list[list[float]]) + """ + + target_columns = 3 if traits_num > 2 else 2 + + trait_loadings = list(factor_loadings.T) + + return [list(trait_loading[:target_columns]) + for trait_loading in trait_loadings] + + +def generate_pca_temp_traits( + species: str, + group: str, + traits_data: list[fArray], + corr_array: list[fArray], + dataset_samples: list[str], + shared_samples: list[str], + create_time: str +) -> dict[str, list[Any]]: + """ + + + generate pca temp datasets + + """ + + # pylint: disable=too-many-arguments + + pca_trait_dict = {} + + pca_vals = generate_pca_traits_vals(traits_data, corr_array) + + for (idx, pca_trait) in enumerate(list(pca_vals)): + + trait_id = f"PCA{str(idx+1)}_{species}_{group}_{create_time}" + sample_vals = [] + + pointer = 0 + + for sample in dataset_samples: + if sample in shared_samples: + + sample_vals.append(str(pca_trait[pointer])) + pointer += 1 + + else: + sample_vals.append("x") + + pca_trait_dict[trait_id] = sample_vals + + return pca_trait_dict + + +def cache_pca_dataset(redis_conn: Any, exp_days: int, + pca_trait_dict: dict[str, list[Any]]): + """ + + caches pca dataset to redis + + Parameters: + + redis_conn(object) + exp_days(int): fo redis cache + pca_trait_dict(Dict): contains traits and traits vals to cache + + Returns: + + boolean(True if correct conn object False incase of exception) + + + """ + + try: + for trait_id, sample_data in pca_trait_dict.items(): + samples_str = " ".join([str(x) for x in sample_data]) + redis_conn.set(trait_id, samples_str, ex=exp_days) + return True + + except (redis.ConnectionError, AttributeError): + return False diff --git a/mypy.ini b/mypy.ini index 9efa803..7497c17 100644 --- a/mypy.ini +++ b/mypy.ini @@ -34,4 +34,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pytest.*] +ignore_missing_imports = True + +[mypy-sklearn.*] ignore_missing_imports = True \ No newline at end of file diff --git a/tests/unit/computations/test_pca.py b/tests/unit/computations/test_pca.py new file mode 100644 index 0000000..0189948 --- /dev/null +++ b/tests/unit/computations/test_pca.py @@ -0,0 +1,101 @@ +"""module contains unittests for pca""" +import unittest +from unittest.mock import patch +from unittest.mock import Mock + +import numpy as np +import pytest + +from gn3.computations.pca import cache_pca_dataset +from gn3.computations.pca import generate_pca_temp_traits +from gn3.computations.pca import generate_scree_plot_data +from gn3.computations.pca import process_factor_loadings_tdata + + +class TestPCA(unittest.TestCase): + """pca testcase class""" + + @pytest.mark.unit_test + def test_process_factor_loadings(self): + """test for processing factor loadings""" + + test_array = np.array([ + [-0.23511749, -0.61483617, -0.26872797, 0.70319381], + [-0.71057342, 0.4623377, -0.52921008, -0.0355803], + [-0.60977093, -0.02877103, 0.78874096, 0.07238328], + [0.26073856, 0.63827311, 0.16003023, 0.70640864] + ]) + + expected_results = [[-0.23511749, -0.71057342, -0.60977093], + [-0.61483617, 0.4623377, -0.02877103], + [-0.26872797, -0.52921008, 0.78874096], + [0.70319381, -0.0355803, 0.07238328]] + + self.assertEqual(process_factor_loadings_tdata( + test_array, 3), expected_results) + + @pytest.mark.unit_test + @patch("gn3.computations.pca.generate_pca_traits_vals") + def test_generate_pca_datasets(self, mock_pca_data): + """test for generating temp pca dataset""" + + mock_pca_data.return_value = np.array([[21, 10, 17, 15, 13], + [21, 11, 18, + 9, 1], + [22, 16, 0, + 0.22667229, -1], + [31, 12, 10, 17, 11]]) + + shared_samples = ["BXD1", "BXD2", "BXD", "BXD4", "Unkown"] + + dataset_samples = ["BXD1", "BXD5", "BXD4", "BXD"] + expected_results = { + "PCA1_mouse_G1_now": ["21.0", "x", "10.0", "17.0"], + "PCA2_mouse_G1_now": ["21.0", "x", "11.0", "18.0"], + "PCA3_mouse_G1_now": ["22.0", "x", "16.0", "0.0"], + "PCA4_mouse_G1_now": ["31.0", "x", "12.0", "10.0"] + } + + results = generate_pca_temp_traits(species="mouse", group="G1", + traits_data=[], + dataset_samples=dataset_samples, + corr_array=[], + shared_samples=shared_samples, + create_time="now") + + self.assertEqual(results, expected_results) + + @pytest.mark.unit_test + def test_generate_scree_plot(self): + """test scree plot data is generated""" + + variance = [0.9271, 0.06232, 0.031] + + self.assertEqual(generate_scree_plot_data(variance), + (['PC1', 'PC2', 'PC3'], [92.7, 6.2, 3.1])) + + @pytest.mark.unit_test + def test_cache_pca_datasets(self): + """test for caching pca datasets""" + + pca_traits = { + "PCA_1": ["11.0", "x", "9.0", "7.0"], + "PCA_2": ["x", "x", "1.2", "3.1"] + } + + self.assertEqual(cache_pca_dataset(redis_conn={}, exp_days=30, + pca_trait_dict=pca_traits), False) + + mock_redis = Mock() + mock_redis.set.return_value = True + + test_data = [({}, 30, pca_traits, False), + (mock_redis, 30, pca_traits, True)] + + for (test_redis, exp_day, test_traits, expected) in test_data: + + with self.subTest(redis_conn=test_redis, + exp_days=exp_day, pca_trait_dict=test_traits): + + self.assertEqual(cache_pca_dataset( + test_redis, exp_day, test_traits), expected) -- cgit v1.2.3