about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlexander Kabui2022-03-15 14:08:39 +0300
committerGitHub2022-03-15 14:08:39 +0300
commit16367dab9248d3aa2660e0b5cafdce25e8f7067c (patch)
tree1d08f861dbd7302ff43489fc338c99fa3cf92e6f
parent4b85316c6ca5355faf6ac85f755744f8d52e49d3 (diff)
downloadgenenetwork3-16367dab9248d3aa2660e0b5cafdce25e8f7067c.tar.gz
Feature/refactored pca (#79)
* compute zscore function

* test case for computing zscore

* function to compute pca

* generate scree plot data

* generate  new pca trait data from zscores and eigen_vec

* remove redundant functions

* generate factor loading table data

* generate pca temp dataset dict

* variable naming and error fixes

* unit test for processing factor loadings

* minor fixes for generating temp pca dataset

* pass datetime as argument to generate_pca temp dataset function

* add unittest  for caching pca datasets

* cache temp datasets

* ignore missing imports for sklearn

* mypy fixes

* pylint fixes

* refactor tests for pca

* remove ununsed imports

* fix for generating pca traits vals

* mypy and code refactoring

* pep8 formatting and add docstrings

* remove comments /pep8 formatting

* sort eigen vectors based on  eigen values

* minor fix for zscores

* fix for rounding variance ratios

* refactor tests

* rename module to pca

* rename datasets to traits

* fix failing tests

* fix caching function

* fixes return x and y coordinates for scree plot

* expand exception scope

* fix for deprecated numpy.matrix function

* fix for failing tests

* pep8 fixes

* remove comments

* fix merge conflict

* pylint fixes

* rename module name to test_pca
-rw-r--r--gn3/computations/pca.py189
-rw-r--r--mypy.ini3
-rw-r--r--tests/unit/computations/test_pca.py101
3 files changed, 293 insertions, 0 deletions
diff --git a/gn3/computations/pca.py b/gn3/computations/pca.py
new file mode 100644
index 0000000..35c9f03
--- /dev/null
+++ b/gn3/computations/pca.py
@@ -0,0 +1,189 @@
+"""module contains pca implementation using python"""
+
+
+from typing import Any
+from scipy import stats
+
+from sklearn.decomposition import PCA
+from sklearn import preprocessing
+
+import numpy as np
+import redis
+
+
+from typing_extensions import TypeAlias
+
+fArray: TypeAlias = list[float]
+
+
+def compute_pca(array: list[fArray]) -> dict[str, Any]:
+    """
+    computes the principal component analysis
+
+    Parameters:
+
+          array(list[list]):a list of lists contains data to perform  pca
+
+
+    Returns:
+           pca_dict(dict):dict contains the pca_object,pca components,pca scores
+
+
+    """
+
+    corr_matrix = np.array(array)
+
+    pca_obj = PCA()
+    scaled_data = preprocessing.scale(corr_matrix)
+
+    pca_obj.fit(scaled_data)
+
+    return {
+        "pca": pca_obj,
+        "components": pca_obj.components_,
+        "scores": pca_obj.transform(scaled_data)
+    }
+
+
+def generate_scree_plot_data(variance_ratio: fArray) -> tuple[list, fArray]:
+    """
+    generates the scree data for plotting
+
+    Parameters:
+
+            variance_ratio(list[floats]):ratios for contribution of each pca
+
+    Returns:
+
+            coordinates(list[(x_coor,y_coord)])
+
+
+    """
+
+    perc_var = [round(ratio*100, 1) for ratio in variance_ratio]
+
+    x_coordinates = [f"PC{val}" for val in range(1, len(perc_var)+1)]
+
+    return (x_coordinates, perc_var)
+
+
+def generate_pca_traits_vals(trait_data_array: list[fArray],
+                             corr_array: list[fArray]) -> list[list[Any]]:
+    """
+    generates datasets from zscores of the traits and eigen_vectors\
+    of correlation matrix
+
+    Parameters:
+
+            trait_data_array(list[floats]):an list of the traits
+            corr_array(list[list]): list of arrays for computing eigen_vectors
+
+    Returns:
+
+            pca_vals[list[list]]:
+
+
+    """
+
+    trait_zscores = stats.zscore(trait_data_array)
+
+    if len(trait_data_array[0]) < 10:
+        trait_zscores = trait_data_array
+
+    (eigen_values, corr_eigen_vectors) = np.linalg.eig(np.array(corr_array))
+    idx = eigen_values.argsort()[::-1]
+
+    return np.dot(corr_eigen_vectors[:, idx], trait_zscores)
+
+
+def process_factor_loadings_tdata(factor_loadings, traits_num: int):
+    """
+
+    transform loadings for tables visualization
+
+    Parameters:
+           factor_loading(numpy.ndarray)
+           traits_num(int):number of traits
+
+    Returns:
+           tabular_loadings(list[list[float]])
+    """
+
+    target_columns = 3 if traits_num > 2 else 2
+
+    trait_loadings = list(factor_loadings.T)
+
+    return [list(trait_loading[:target_columns])
+            for trait_loading in trait_loadings]
+
+
+def generate_pca_temp_traits(
+    species: str,
+    group: str,
+    traits_data: list[fArray],
+    corr_array: list[fArray],
+    dataset_samples: list[str],
+    shared_samples: list[str],
+    create_time: str
+) -> dict[str, list[Any]]:
+    """
+
+
+    generate pca temp datasets
+
+    """
+
+    # pylint: disable=too-many-arguments
+
+    pca_trait_dict = {}
+
+    pca_vals = generate_pca_traits_vals(traits_data, corr_array)
+
+    for (idx, pca_trait) in enumerate(list(pca_vals)):
+
+        trait_id = f"PCA{str(idx+1)}_{species}_{group}_{create_time}"
+        sample_vals = []
+
+        pointer = 0
+
+        for sample in dataset_samples:
+            if sample in shared_samples:
+
+                sample_vals.append(str(pca_trait[pointer]))
+                pointer += 1
+
+            else:
+                sample_vals.append("x")
+
+        pca_trait_dict[trait_id] = sample_vals
+
+    return pca_trait_dict
+
+
+def cache_pca_dataset(redis_conn: Any, exp_days: int,
+                      pca_trait_dict: dict[str, list[Any]]):
+    """
+
+    caches pca dataset to redis
+
+    Parameters:
+
+            redis_conn(object)
+            exp_days(int): fo redis cache
+            pca_trait_dict(Dict): contains traits and traits vals to cache
+
+    Returns:
+
+            boolean(True if correct conn object False incase of exception)
+
+
+    """
+
+    try:
+        for trait_id, sample_data in pca_trait_dict.items():
+            samples_str = " ".join([str(x) for x in sample_data])
+            redis_conn.set(trait_id, samples_str, ex=exp_days)
+        return True
+
+    except (redis.ConnectionError, AttributeError):
+        return False
diff --git a/mypy.ini b/mypy.ini
index 9efa803..7497c17 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -34,4 +34,7 @@ ignore_missing_imports = True
 ignore_missing_imports = True
 
 [mypy-pytest.*]
+ignore_missing_imports = True
+
+[mypy-sklearn.*]
 ignore_missing_imports = True
\ No newline at end of file
diff --git a/tests/unit/computations/test_pca.py b/tests/unit/computations/test_pca.py
new file mode 100644
index 0000000..0189948
--- /dev/null
+++ b/tests/unit/computations/test_pca.py
@@ -0,0 +1,101 @@
+"""module contains unittests for pca"""
+import unittest
+from unittest.mock import patch
+from unittest.mock import Mock
+
+import numpy as np
+import pytest
+
+from gn3.computations.pca import cache_pca_dataset
+from gn3.computations.pca import generate_pca_temp_traits
+from gn3.computations.pca import generate_scree_plot_data
+from gn3.computations.pca import process_factor_loadings_tdata
+
+
+class TestPCA(unittest.TestCase):
+    """pca testcase class"""
+
+    @pytest.mark.unit_test
+    def test_process_factor_loadings(self):
+        """test for processing factor loadings"""
+
+        test_array = np.array([
+            [-0.23511749, -0.61483617, -0.26872797,  0.70319381],
+            [-0.71057342,  0.4623377, -0.52921008, -0.0355803],
+            [-0.60977093, -0.02877103, 0.78874096,  0.07238328],
+            [0.26073856,  0.63827311,  0.16003023,  0.70640864]
+        ])
+
+        expected_results = [[-0.23511749, -0.71057342, -0.60977093],
+                            [-0.61483617, 0.4623377, -0.02877103],
+                            [-0.26872797, -0.52921008, 0.78874096],
+                            [0.70319381, -0.0355803, 0.07238328]]
+
+        self.assertEqual(process_factor_loadings_tdata(
+            test_array, 3), expected_results)
+
+    @pytest.mark.unit_test
+    @patch("gn3.computations.pca.generate_pca_traits_vals")
+    def test_generate_pca_datasets(self, mock_pca_data):
+        """test for generating temp pca dataset"""
+
+        mock_pca_data.return_value = np.array([[21, 10, 17, 15, 13],
+                                               [21, 11, 18,
+                                                9, 1],
+                                               [22, 16, 0,
+                                                0.22667229, -1],
+                                               [31, 12, 10, 17, 11]])
+
+        shared_samples = ["BXD1", "BXD2", "BXD", "BXD4", "Unkown"]
+
+        dataset_samples = ["BXD1", "BXD5", "BXD4", "BXD"]
+        expected_results = {
+            "PCA1_mouse_G1_now": ["21.0",   "x",   "10.0",   "17.0"],
+            "PCA2_mouse_G1_now": ["21.0",   "x",   "11.0",   "18.0"],
+            "PCA3_mouse_G1_now": ["22.0",   "x",   "16.0",   "0.0"],
+            "PCA4_mouse_G1_now": ["31.0",   "x",   "12.0",   "10.0"]
+        }
+
+        results = generate_pca_temp_traits(species="mouse", group="G1",
+                                           traits_data=[],
+                                           dataset_samples=dataset_samples,
+                                           corr_array=[],
+                                           shared_samples=shared_samples,
+                                           create_time="now")
+
+        self.assertEqual(results, expected_results)
+
+    @pytest.mark.unit_test
+    def test_generate_scree_plot(self):
+        """test scree plot data is generated"""
+
+        variance = [0.9271, 0.06232, 0.031]
+
+        self.assertEqual(generate_scree_plot_data(variance),
+                         (['PC1', 'PC2', 'PC3'], [92.7, 6.2, 3.1]))
+
+    @pytest.mark.unit_test
+    def test_cache_pca_datasets(self):
+        """test for caching pca datasets"""
+
+        pca_traits = {
+            "PCA_1": ["11.0",   "x",   "9.0",   "7.0"],
+            "PCA_2": ["x", "x", "1.2", "3.1"]
+        }
+
+        self.assertEqual(cache_pca_dataset(redis_conn={}, exp_days=30,
+                                           pca_trait_dict=pca_traits), False)
+
+        mock_redis = Mock()
+        mock_redis.set.return_value = True
+
+        test_data = [({}, 30, pca_traits, False),
+                     (mock_redis, 30, pca_traits, True)]
+
+        for (test_redis, exp_day, test_traits, expected) in test_data:
+
+            with self.subTest(redis_conn=test_redis,
+                              exp_days=exp_day, pca_trait_dict=test_traits):
+
+                self.assertEqual(cache_pca_dataset(
+                    test_redis, exp_day, test_traits), expected)