aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn3/computations/pca.py189
-rw-r--r--mypy.ini3
-rw-r--r--tests/unit/computations/test_pca.py101
3 files changed, 293 insertions, 0 deletions
diff --git a/gn3/computations/pca.py b/gn3/computations/pca.py
new file mode 100644
index 0000000..35c9f03
--- /dev/null
+++ b/gn3/computations/pca.py
@@ -0,0 +1,189 @@
+"""module contains pca implementation using python"""
+
+
+from typing import Any
+from scipy import stats
+
+from sklearn.decomposition import PCA
+from sklearn import preprocessing
+
+import numpy as np
+import redis
+
+
+from typing_extensions import TypeAlias
+
+fArray: TypeAlias = list[float]
+
+
+def compute_pca(array: list[fArray]) -> dict[str, Any]:
+ """
+ computes the principal component analysis
+
+ Parameters:
+
+ array(list[list]):a list of lists contains data to perform pca
+
+
+ Returns:
+ pca_dict(dict):dict contains the pca_object,pca components,pca scores
+
+
+ """
+
+ corr_matrix = np.array(array)
+
+ pca_obj = PCA()
+ scaled_data = preprocessing.scale(corr_matrix)
+
+ pca_obj.fit(scaled_data)
+
+ return {
+ "pca": pca_obj,
+ "components": pca_obj.components_,
+ "scores": pca_obj.transform(scaled_data)
+ }
+
+
+def generate_scree_plot_data(variance_ratio: fArray) -> tuple[list, fArray]:
+ """
+ generates the scree data for plotting
+
+ Parameters:
+
+ variance_ratio(list[floats]):ratios for contribution of each pca
+
+ Returns:
+
+ coordinates(list[(x_coor,y_coord)])
+
+
+ """
+
+ perc_var = [round(ratio*100, 1) for ratio in variance_ratio]
+
+ x_coordinates = [f"PC{val}" for val in range(1, len(perc_var)+1)]
+
+ return (x_coordinates, perc_var)
+
+
+def generate_pca_traits_vals(trait_data_array: list[fArray],
+ corr_array: list[fArray]) -> list[list[Any]]:
+ """
+ generates datasets from zscores of the traits and eigen_vectors\
+ of correlation matrix
+
+ Parameters:
+
+ trait_data_array(list[floats]):an list of the traits
+ corr_array(list[list]): list of arrays for computing eigen_vectors
+
+ Returns:
+
+ pca_vals[list[list]]:
+
+
+ """
+
+ trait_zscores = stats.zscore(trait_data_array)
+
+ if len(trait_data_array[0]) < 10:
+ trait_zscores = trait_data_array
+
+ (eigen_values, corr_eigen_vectors) = np.linalg.eig(np.array(corr_array))
+ idx = eigen_values.argsort()[::-1]
+
+ return np.dot(corr_eigen_vectors[:, idx], trait_zscores)
+
+
+def process_factor_loadings_tdata(factor_loadings, traits_num: int):
+ """
+
+ transform loadings for tables visualization
+
+ Parameters:
+ factor_loading(numpy.ndarray)
+ traits_num(int):number of traits
+
+ Returns:
+ tabular_loadings(list[list[float]])
+ """
+
+ target_columns = 3 if traits_num > 2 else 2
+
+ trait_loadings = list(factor_loadings.T)
+
+ return [list(trait_loading[:target_columns])
+ for trait_loading in trait_loadings]
+
+
+def generate_pca_temp_traits(
+ species: str,
+ group: str,
+ traits_data: list[fArray],
+ corr_array: list[fArray],
+ dataset_samples: list[str],
+ shared_samples: list[str],
+ create_time: str
+) -> dict[str, list[Any]]:
+ """
+
+
+ generate pca temp datasets
+
+ """
+
+ # pylint: disable=too-many-arguments
+
+ pca_trait_dict = {}
+
+ pca_vals = generate_pca_traits_vals(traits_data, corr_array)
+
+ for (idx, pca_trait) in enumerate(list(pca_vals)):
+
+ trait_id = f"PCA{str(idx+1)}_{species}_{group}_{create_time}"
+ sample_vals = []
+
+ pointer = 0
+
+ for sample in dataset_samples:
+ if sample in shared_samples:
+
+ sample_vals.append(str(pca_trait[pointer]))
+ pointer += 1
+
+ else:
+ sample_vals.append("x")
+
+ pca_trait_dict[trait_id] = sample_vals
+
+ return pca_trait_dict
+
+
+def cache_pca_dataset(redis_conn: Any, exp_days: int,
+ pca_trait_dict: dict[str, list[Any]]):
+ """
+
+ caches pca dataset to redis
+
+ Parameters:
+
+ redis_conn(object)
+ exp_days(int): fo redis cache
+ pca_trait_dict(Dict): contains traits and traits vals to cache
+
+ Returns:
+
+ boolean(True if correct conn object False incase of exception)
+
+
+ """
+
+ try:
+ for trait_id, sample_data in pca_trait_dict.items():
+ samples_str = " ".join([str(x) for x in sample_data])
+ redis_conn.set(trait_id, samples_str, ex=exp_days)
+ return True
+
+ except (redis.ConnectionError, AttributeError):
+ return False
diff --git a/mypy.ini b/mypy.ini
index 9efa803..7497c17 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -34,4 +34,7 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-pytest.*]
+ignore_missing_imports = True
+
+[mypy-sklearn.*]
ignore_missing_imports = True \ No newline at end of file
diff --git a/tests/unit/computations/test_pca.py b/tests/unit/computations/test_pca.py
new file mode 100644
index 0000000..0189948
--- /dev/null
+++ b/tests/unit/computations/test_pca.py
@@ -0,0 +1,101 @@
+"""module contains unittests for pca"""
+import unittest
+from unittest.mock import patch
+from unittest.mock import Mock
+
+import numpy as np
+import pytest
+
+from gn3.computations.pca import cache_pca_dataset
+from gn3.computations.pca import generate_pca_temp_traits
+from gn3.computations.pca import generate_scree_plot_data
+from gn3.computations.pca import process_factor_loadings_tdata
+
+
+class TestPCA(unittest.TestCase):
+ """pca testcase class"""
+
+ @pytest.mark.unit_test
+ def test_process_factor_loadings(self):
+ """test for processing factor loadings"""
+
+ test_array = np.array([
+ [-0.23511749, -0.61483617, -0.26872797, 0.70319381],
+ [-0.71057342, 0.4623377, -0.52921008, -0.0355803],
+ [-0.60977093, -0.02877103, 0.78874096, 0.07238328],
+ [0.26073856, 0.63827311, 0.16003023, 0.70640864]
+ ])
+
+ expected_results = [[-0.23511749, -0.71057342, -0.60977093],
+ [-0.61483617, 0.4623377, -0.02877103],
+ [-0.26872797, -0.52921008, 0.78874096],
+ [0.70319381, -0.0355803, 0.07238328]]
+
+ self.assertEqual(process_factor_loadings_tdata(
+ test_array, 3), expected_results)
+
+ @pytest.mark.unit_test
+ @patch("gn3.computations.pca.generate_pca_traits_vals")
+ def test_generate_pca_datasets(self, mock_pca_data):
+ """test for generating temp pca dataset"""
+
+ mock_pca_data.return_value = np.array([[21, 10, 17, 15, 13],
+ [21, 11, 18,
+ 9, 1],
+ [22, 16, 0,
+ 0.22667229, -1],
+ [31, 12, 10, 17, 11]])
+
+ shared_samples = ["BXD1", "BXD2", "BXD", "BXD4", "Unkown"]
+
+ dataset_samples = ["BXD1", "BXD5", "BXD4", "BXD"]
+ expected_results = {
+ "PCA1_mouse_G1_now": ["21.0", "x", "10.0", "17.0"],
+ "PCA2_mouse_G1_now": ["21.0", "x", "11.0", "18.0"],
+ "PCA3_mouse_G1_now": ["22.0", "x", "16.0", "0.0"],
+ "PCA4_mouse_G1_now": ["31.0", "x", "12.0", "10.0"]
+ }
+
+ results = generate_pca_temp_traits(species="mouse", group="G1",
+ traits_data=[],
+ dataset_samples=dataset_samples,
+ corr_array=[],
+ shared_samples=shared_samples,
+ create_time="now")
+
+ self.assertEqual(results, expected_results)
+
+ @pytest.mark.unit_test
+ def test_generate_scree_plot(self):
+ """test scree plot data is generated"""
+
+ variance = [0.9271, 0.06232, 0.031]
+
+ self.assertEqual(generate_scree_plot_data(variance),
+ (['PC1', 'PC2', 'PC3'], [92.7, 6.2, 3.1]))
+
+ @pytest.mark.unit_test
+ def test_cache_pca_datasets(self):
+ """test for caching pca datasets"""
+
+ pca_traits = {
+ "PCA_1": ["11.0", "x", "9.0", "7.0"],
+ "PCA_2": ["x", "x", "1.2", "3.1"]
+ }
+
+ self.assertEqual(cache_pca_dataset(redis_conn={}, exp_days=30,
+ pca_trait_dict=pca_traits), False)
+
+ mock_redis = Mock()
+ mock_redis.set.return_value = True
+
+ test_data = [({}, 30, pca_traits, False),
+ (mock_redis, 30, pca_traits, True)]
+
+ for (test_redis, exp_day, test_traits, expected) in test_data:
+
+ with self.subTest(redis_conn=test_redis,
+ exp_days=exp_day, pca_trait_dict=test_traits):
+
+ self.assertEqual(cache_pca_dataset(
+ test_redis, exp_day, test_traits), expected)