"""module contains pca implementation using python""" from typing import Any from scipy import stats from sklearn.decomposition import PCA from sklearn import preprocessing import numpy as np import redis from typing_extensions import TypeAlias fArray: TypeAlias = list[float] def compute_pca(array: list[fArray]) -> dict[str, Any]: """ computes the principal component analysis Parameters: array(list[list]):a list of lists contains data to perform pca Returns: pca_dict(dict):dict contains the pca_object,pca components,pca scores """ corr_matrix = np.array(array) pca_obj = PCA() scaled_data = preprocessing.scale(corr_matrix) pca_obj.fit(scaled_data) return { "pca": pca_obj, "components": pca_obj.components_, "scores": pca_obj.transform(scaled_data) } def generate_scree_plot_data(variance_ratio: fArray) -> tuple[list, fArray]: """ generates the scree data for plotting Parameters: variance_ratio(list[floats]):ratios for contribution of each pca Returns: coordinates(list[(x_coor,y_coord)]) """ perc_var = [round(ratio*100, 1) for ratio in variance_ratio] x_coordinates = [f"PC{val}" for val in range(1, len(perc_var)+1)] return (x_coordinates, perc_var) def generate_pca_traits_vals(trait_data_array: list[fArray], corr_array: list[fArray]) -> list[list[Any]]: """ generates datasets from zscores of the traits and eigen_vectors\ of correlation matrix Parameters: trait_data_array(list[floats]):an list of the traits corr_array(list[list]): list of arrays for computing eigen_vectors Returns: pca_vals[list[list]]: """ trait_zscores = stats.zscore(trait_data_array) if len(trait_data_array[0]) < 10: trait_zscores = trait_data_array (eigen_values, corr_eigen_vectors) = np.linalg.eig(np.array(corr_array)) idx = eigen_values.argsort()[::-1] return np.dot(corr_eigen_vectors[:, idx], trait_zscores) def process_factor_loadings_tdata(factor_loadings, traits_num: int): """ transform loadings for tables visualization Parameters: factor_loading(numpy.ndarray) traits_num(int):number of traits Returns: tabular_loadings(list[list[float]]) """ target_columns = 3 if traits_num > 2 else 2 trait_loadings = list(factor_loadings.T) return [list(trait_loading[:target_columns]) for trait_loading in trait_loadings] def generate_pca_temp_traits( species: str, group: str, traits_data: list[fArray], corr_array: list[fArray], dataset_samples: list[str], shared_samples: list[str], create_time: str ) -> dict[str, list[Any]]: """ generate pca temp datasets """ # pylint: disable=too-many-arguments pca_trait_dict = {} pca_vals = generate_pca_traits_vals(traits_data, corr_array) for (idx, pca_trait) in enumerate(list(pca_vals)): trait_id = f"PCA{str(idx+1)}_{species}_{group}_{create_time}" sample_vals = [] pointer = 0 for sample in dataset_samples: if sample in shared_samples: sample_vals.append(str(pca_trait[pointer])) pointer += 1 else: sample_vals.append("x") pca_trait_dict[trait_id] = sample_vals return pca_trait_dict def cache_pca_dataset(redis_conn: Any, exp_days: int, pca_trait_dict: dict[str, list[Any]]): """ caches pca dataset to redis Parameters: redis_conn(object) exp_days(int): fo redis cache pca_trait_dict(Dict): contains traits and traits vals to cache Returns: boolean(True if correct conn object False incase of exception) """ try: for trait_id, sample_data in pca_trait_dict.items(): samples_str = " ".join([str(x) for x in sample_data]) redis_conn.set(trait_id, samples_str, ex=exp_days) return True except (redis.ConnectionError, AttributeError): return False