From 7c20150278dbcb18f088de1afe6ea8411d29827c Mon Sep 17 00:00:00 2001 From: Alexander Kabui Date: Fri, 17 Dec 2021 02:17:04 +0300 Subject: to drop commit:test on penguin --- gn3/computations/correlations.py | 59 ++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/gn3/computations/correlations.py b/gn3/computations/correlations.py index 24e9871..cc166fa 100644 --- a/gn3/computations/correlations.py +++ b/gn3/computations/correlations.py @@ -9,6 +9,7 @@ from typing import Optional from typing import Callable import scipy.stats +import numpy as np import pingouin as pg @@ -424,24 +425,58 @@ def fast_compute_tissue_correlation(primary_tissue_dict: dict, key=lambda trait_name: -abs(list(trait_name.values())[0]["tissue_corr"])) -def _mp_calculate(x_val, dataset, n_jobs, chunk_size): +def compute_correlation_2(corr_inputs): + + (this_trait_samples, target_trait) = corr_inputs + + trait_name = target_trait.get("trait_id") + target_trait_data = target_trait["trait_sample_data"] + + try: + (x_vals, y_vals) = list(zip(*list(filter_shared_sample_keys( + this_trait_samples, target_trait_data)))) + + x_vals = np.array(x_vals, dtype=float) + y_vals = np.array(y_vals, dtype=float) + + if len(x_vals) > 5: + # remove nan values + nans_values = np.logical_or(np.isnan(x_vals), np.isnan(y_vals)) + + (corr_coeff, p_val) = scipy.stats.pearsonr( + x_vals[~nans_values], y_vals[~nans_values]) + + # print(corr_coeff, p_val) + return{trait_name: { + "corr_coefficient": corr_coeff, + "p_value": p_val, + "num_overlap": len(x_vals) + }} + + except ValueError: + return + + +def mp_calculate(this_trait, dataset, n_jobs: int = -1, chunk_size: int = 500): """corr mp reimplementation credit:https://github.com/bukson/nancorrmp - """ - def _compute_correlation_2(x_val, y_val, trait_name): - """function to compute correlation""" - - return (trait_name, scipy.stats.pearsonr(x_val, y_val)) + """ + this_trait_samples = this_trait["trait_sample_data"] - arguments = ((x_val, y_val, trait_name) for (trait_name, y_val) in dataset) + arguments = [(this_trait_samples, target_trait) + for target_trait in dataset] processes = n_jobs if n_jobs > 0 else None - worker_function = _compute_correlation_2 + chunksize = len(arguments)//processes + + worker_function = compute_correlation_2 - with multiprocessing.pool(processes=processes) as pool: - results = list(pool.imap_unordered( - worker_function, arguments, chunksize=chunks)) + with multiprocessing.Pool(processes=processes) as pool: + corr_results = list(pool.imap_unordered( + worker_function, arguments, chunksize=chunksize)) - return results + return sorted( + corr_results, + key=lambda trait_name: -abs(list(trait_name.values())[0]["corr_coefficient"])) -- cgit v1.2.3