about summary refs log tree commit diff
path: root/gn3/computations
diff options
context:
space:
mode:
authorAlexander Kabui2022-03-15 14:08:39 +0300
committerGitHub2022-03-15 14:08:39 +0300
commit16367dab9248d3aa2660e0b5cafdce25e8f7067c (patch)
tree1d08f861dbd7302ff43489fc338c99fa3cf92e6f /gn3/computations
parent4b85316c6ca5355faf6ac85f755744f8d52e49d3 (diff)
downloadgenenetwork3-16367dab9248d3aa2660e0b5cafdce25e8f7067c.tar.gz
Feature/refactored pca (#79)
* compute zscore function

* test case for computing zscore

* function to compute pca

* generate scree plot data

* generate  new pca trait data from zscores and eigen_vec

* remove redundant functions

* generate factor loading table data

* generate pca temp dataset dict

* variable naming and error fixes

* unit test for processing factor loadings

* minor fixes for generating temp pca dataset

* pass datetime as argument to generate_pca temp dataset function

* add unittest  for caching pca datasets

* cache temp datasets

* ignore missing imports for sklearn

* mypy fixes

* pylint fixes

* refactor tests for pca

* remove ununsed imports

* fix for generating pca traits vals

* mypy and code refactoring

* pep8 formatting and add docstrings

* remove comments /pep8 formatting

* sort eigen vectors based on  eigen values

* minor fix for zscores

* fix for rounding variance ratios

* refactor tests

* rename module to pca

* rename datasets to traits

* fix failing tests

* fix caching function

* fixes return x and y coordinates for scree plot

* expand exception scope

* fix for deprecated numpy.matrix function

* fix for failing tests

* pep8 fixes

* remove comments

* fix merge conflict

* pylint fixes

* rename module name to test_pca
Diffstat (limited to 'gn3/computations')
-rw-r--r--gn3/computations/pca.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/gn3/computations/pca.py b/gn3/computations/pca.py
new file mode 100644
index 0000000..35c9f03
--- /dev/null
+++ b/gn3/computations/pca.py
@@ -0,0 +1,189 @@
+"""module contains pca implementation using python"""
+
+
+from typing import Any
+from scipy import stats
+
+from sklearn.decomposition import PCA
+from sklearn import preprocessing
+
+import numpy as np
+import redis
+
+
+from typing_extensions import TypeAlias
+
+fArray: TypeAlias = list[float]
+
+
+def compute_pca(array: list[fArray]) -> dict[str, Any]:
+    """
+    computes the principal component analysis
+
+    Parameters:
+
+          array(list[list]):a list of lists contains data to perform  pca
+
+
+    Returns:
+           pca_dict(dict):dict contains the pca_object,pca components,pca scores
+
+
+    """
+
+    corr_matrix = np.array(array)
+
+    pca_obj = PCA()
+    scaled_data = preprocessing.scale(corr_matrix)
+
+    pca_obj.fit(scaled_data)
+
+    return {
+        "pca": pca_obj,
+        "components": pca_obj.components_,
+        "scores": pca_obj.transform(scaled_data)
+    }
+
+
+def generate_scree_plot_data(variance_ratio: fArray) -> tuple[list, fArray]:
+    """
+    generates the scree data for plotting
+
+    Parameters:
+
+            variance_ratio(list[floats]):ratios for contribution of each pca
+
+    Returns:
+
+            coordinates(list[(x_coor,y_coord)])
+
+
+    """
+
+    perc_var = [round(ratio*100, 1) for ratio in variance_ratio]
+
+    x_coordinates = [f"PC{val}" for val in range(1, len(perc_var)+1)]
+
+    return (x_coordinates, perc_var)
+
+
+def generate_pca_traits_vals(trait_data_array: list[fArray],
+                             corr_array: list[fArray]) -> list[list[Any]]:
+    """
+    generates datasets from zscores of the traits and eigen_vectors\
+    of correlation matrix
+
+    Parameters:
+
+            trait_data_array(list[floats]):an list of the traits
+            corr_array(list[list]): list of arrays for computing eigen_vectors
+
+    Returns:
+
+            pca_vals[list[list]]:
+
+
+    """
+
+    trait_zscores = stats.zscore(trait_data_array)
+
+    if len(trait_data_array[0]) < 10:
+        trait_zscores = trait_data_array
+
+    (eigen_values, corr_eigen_vectors) = np.linalg.eig(np.array(corr_array))
+    idx = eigen_values.argsort()[::-1]
+
+    return np.dot(corr_eigen_vectors[:, idx], trait_zscores)
+
+
+def process_factor_loadings_tdata(factor_loadings, traits_num: int):
+    """
+
+    transform loadings for tables visualization
+
+    Parameters:
+           factor_loading(numpy.ndarray)
+           traits_num(int):number of traits
+
+    Returns:
+           tabular_loadings(list[list[float]])
+    """
+
+    target_columns = 3 if traits_num > 2 else 2
+
+    trait_loadings = list(factor_loadings.T)
+
+    return [list(trait_loading[:target_columns])
+            for trait_loading in trait_loadings]
+
+
+def generate_pca_temp_traits(
+    species: str,
+    group: str,
+    traits_data: list[fArray],
+    corr_array: list[fArray],
+    dataset_samples: list[str],
+    shared_samples: list[str],
+    create_time: str
+) -> dict[str, list[Any]]:
+    """
+
+
+    generate pca temp datasets
+
+    """
+
+    # pylint: disable=too-many-arguments
+
+    pca_trait_dict = {}
+
+    pca_vals = generate_pca_traits_vals(traits_data, corr_array)
+
+    for (idx, pca_trait) in enumerate(list(pca_vals)):
+
+        trait_id = f"PCA{str(idx+1)}_{species}_{group}_{create_time}"
+        sample_vals = []
+
+        pointer = 0
+
+        for sample in dataset_samples:
+            if sample in shared_samples:
+
+                sample_vals.append(str(pca_trait[pointer]))
+                pointer += 1
+
+            else:
+                sample_vals.append("x")
+
+        pca_trait_dict[trait_id] = sample_vals
+
+    return pca_trait_dict
+
+
+def cache_pca_dataset(redis_conn: Any, exp_days: int,
+                      pca_trait_dict: dict[str, list[Any]]):
+    """
+
+    caches pca dataset to redis
+
+    Parameters:
+
+            redis_conn(object)
+            exp_days(int): fo redis cache
+            pca_trait_dict(Dict): contains traits and traits vals to cache
+
+    Returns:
+
+            boolean(True if correct conn object False incase of exception)
+
+
+    """
+
+    try:
+        for trait_id, sample_data in pca_trait_dict.items():
+            samples_str = " ".join([str(x) for x in sample_data])
+            redis_conn.set(trait_id, samples_str, ex=exp_days)
+        return True
+
+    except (redis.ConnectionError, AttributeError):
+        return False