about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn3/computations/correlations.py217
1 files changed, 124 insertions, 93 deletions
diff --git a/gn3/computations/correlations.py b/gn3/computations/correlations.py
index 21f5929..d1d6ddb 100644
--- a/gn3/computations/correlations.py
+++ b/gn3/computations/correlations.py
@@ -7,20 +7,21 @@ from typing import Callable
 import scipy.stats  # type: ignore
 
 
-def compute_sum(rhs: int, lhs: int)-> int:
-    """initial tests to compute  sum  of two numbers"""
+def compute_sum(rhs: int, lhs: int) -> int:
+    """Initial tests to compute sum of two numbers"""
     return rhs + lhs
 
 
-def normalize_values(a_values: List, b_values: List)->Tuple[List[float], List[float], int]:
-    """
-    Trim two lists of values to contain only the values they both share
+def normalize_values(a_values: List,
+                     b_values: List) -> Tuple[List[float], List[float], int]:
+    """Trim two lists of values to contain only the values they both share
 
-    Given two lists of sample values, trim each list so that it contains
-    only the samples that contain a value in both lists. Also returns
-    the number of such samples.
+    Given two lists of sample values, trim each list so that it contains only
+    the samples that contain a value in both lists. Also returns the number of
+    such samples.
 
-    >>> normalize_values([2.3, None, None, 3.2, 4.1, 5], [3.4, 7.2, 1.3, None, 6.2, 4.1])
+    >>> normalize_values([2.3, None, None, 3.2, 4.1, 5],
+                         [3.4, 7.2, 1.3, None, 6.2, 4.1])
     ([2.3, 4.1, 5], [3.4, 6.2, 4.1], 3)
 
     """
@@ -33,11 +34,11 @@ def normalize_values(a_values: List, b_values: List)->Tuple[List[float], List[fl
     return a_new, b_new, len(a_new)
 
 
-def compute_corr_coeff_p_value(primary_values: List, target_values: List, corr_method: str)->\
-        Tuple[float, float]:
-    """given array like inputs calculate the primary and target_value
-     methods ->pearson,spearman and biweight mid correlation
-     return value is rho and p_value
+def compute_corr_coeff_p_value(primary_values: List, target_values: List,
+                               corr_method: str) -> Tuple[float, float]:
+    """Given array like inputs calculate the primary and target_value methods ->
+pearson,spearman and biweight mid correlation return value is rho and p_value
+
     """
     corr_mapping = {
         "bicor": do_bicor,
@@ -52,13 +53,16 @@ def compute_corr_coeff_p_value(primary_values: List, target_values: List, corr_m
     return (corr_coeffient, p_val)
 
 
-def compute_sample_r_correlation(corr_method: str, trait_vals, target_samples_vals)->\
-        Optional[Tuple[float, float, int]]:
-    """Given a primary trait values and target trait values
-    calculate the correlation coeff and p value"""
+def compute_sample_r_correlation(
+        corr_method: str, trait_vals,
+        target_samples_vals) -> Optional[Tuple[float, float, int]]:
+    """Given a primary trait values and target trait values calculate the
+    correlation coeff and p value
 
-    sanitized_traits_vals, sanitized_target_vals,\
-        num_overlap = normalize_values(trait_vals, target_samples_vals)
+    """
+
+    (sanitized_traits_vals, sanitized_target_vals,
+     num_overlap) = normalize_values(trait_vals, target_samples_vals)
 
     if num_overlap > 5:
 
@@ -67,7 +71,8 @@ def compute_sample_r_correlation(corr_method: str, trait_vals, target_samples_va
                                        target_values=sanitized_target_vals,
                                        corr_method=corr_method)
 
-        # xtodo check if corr_coefficient is None should use numpy.isNan scipy.isNan is deprecated
+        # xtodo check if corr_coefficient is None
+        # should use numpy.isNan scipy.isNan is deprecated
         if corr_coeffient is not None:
             return (corr_coeffient, p_value, num_overlap)
 
@@ -75,29 +80,33 @@ def compute_sample_r_correlation(corr_method: str, trait_vals, target_samples_va
 
 
 def do_bicor(x_val, y_val) -> Tuple[float, float]:
-    """not implemented method for doing biweight mid correlation
-    use  astropy stats package :not packaged in guix
-    """
+    """Not implemented method for doing biweight mid correlation use astropy stats
+package :not packaged in guix
 
+    """
     return (x_val, y_val)
 
 
-def filter_shared_sample_keys(this_samplelist, target_samplelist)->Tuple[List, List]:
-    """given primary and target samplelist for two base and target\
-    trait select filter the values using the shared keys"""
+def filter_shared_sample_keys(this_samplelist,
+                              target_samplelist) -> Tuple[List, List]:
+    """Given primary and target samplelist for two base and target trait select
+filter the values using the shared keys
+
+    """
     this_vals = []
     target_vals = []
-
     for key, value in target_samplelist.items():
         if key in this_samplelist:
             target_vals.append(value)
             this_vals.append(this_samplelist[key])
-
     return (this_vals, target_vals)
 
 
-def compute_all_sample_correlation(this_trait, target_dataset, corr_method="pearson")->List:
-    """given a trait data samplelist and target__datasets compute all sample correlation"""
+def compute_all_sample_correlation(this_trait,
+                                   target_dataset,
+                                   corr_method="pearson") -> List:
+    """Given a trait data samplelist and target__datasets compute all sample
+correlation"""
 
     this_trait_samples = this_trait["trait_sample_data"]
 
@@ -110,7 +119,9 @@ def compute_all_sample_correlation(this_trait, target_dataset, corr_method="pear
             this_trait_samples, target_trait_data)
 
         sample_correlation = compute_sample_r_correlation(
-            corr_method=corr_method, trait_vals=this_vals, target_samples_vals=target_vals)
+            corr_method=corr_method,
+            trait_vals=this_vals,
+            target_samples_vals=target_vals)
 
         if sample_correlation is not None:
             (corr_coeffient, p_value, num_overlap) = sample_correlation
@@ -118,9 +129,11 @@ def compute_all_sample_correlation(this_trait, target_dataset, corr_method="pear
         else:
             continue
 
-        corr_result = {"corr_coeffient": corr_coeffient,
-                       "p_value": p_value,
-                       "num_overlap": num_overlap}
+        corr_result = {
+            "corr_coeffient": corr_coeffient,
+            "p_value": p_value,
+            "num_overlap": num_overlap
+        }
 
         corr_results.append({trait_id: corr_result})
 
@@ -128,9 +141,11 @@ def compute_all_sample_correlation(this_trait, target_dataset, corr_method="pear
 
 
 def tissue_lit_corr_for_probe_type(corr_type: str, top_corr_results):
-    """function that does either lit_corr_for_trait_list or tissue_corr\
-    _for_trait list depending on whether both dataset and target_dataset are\
-    both set to probet"""
+    """Function that does either lit_corr_for_trait_list or tissue_corr _for_trait
+list depending on whether both dataset and target_dataset are both set to
+probet
+
+    """
 
     corr_results = {"lit": 1}
 
@@ -148,22 +163,24 @@ def tissue_lit_corr_for_probe_type(corr_type: str, top_corr_results):
     return corr_results
 
 
-def tissue_correlation_for_trait_list(primary_tissue_vals: List,
-                                      target_tissues_values: List,
-                                      corr_method: str,
-                                      compute_corr_p_value: Callable =
-                                      compute_corr_coeff_p_value)->dict:
-    """given a primary tissue values for a trait and the target tissues values\
-    compute the correlation_cooeff and p value  the input required are arrays\
-    output - > List containing Dicts with corr_coefficient value,P_value and\
-    also the tissue numbers is len(primary) == len(target)"""
+def tissue_correlation_for_trait_list(
+        primary_tissue_vals: List,
+        target_tissues_values: List,
+        corr_method: str,
+        compute_corr_p_value: Callable = compute_corr_coeff_p_value) -> dict:
+    """Given a primary tissue values for a trait and the target tissues values
+    compute the correlation_cooeff and p value the input required are arrays
+    output -> List containing Dicts with corr_coefficient value,P_value and
+    also the tissue numbers is len(primary) == len(target)
+
+    """
 
     # ax :todo assertion that lenggth one one target tissue ==primary_tissue
 
-    (tissue_corr_coeffient, p_value) = compute_corr_p_value(
-        primary_values=primary_tissue_vals,
-        target_values=target_tissues_values,
-        corr_method=corr_method)
+    (tissue_corr_coeffient,
+     p_value) = compute_corr_p_value(primary_values=primary_tissue_vals,
+                                     target_values=target_tissues_values,
+                                     corr_method=corr_method)
 
     lit_corr_result = {
         "tissue_corr": tissue_corr_coeffient,
@@ -174,10 +191,11 @@ def tissue_correlation_for_trait_list(primary_tissue_vals: List,
     return lit_corr_result
 
 
-def fetch_lit_correlation_data(database,
-                               input_mouse_gene_id: Optional[str],
-                               gene_id: str,
-                               mouse_gene_id: Optional[str] = None)->Tuple[str, float]:
+def fetch_lit_correlation_data(
+        database,
+        input_mouse_gene_id: Optional[str],
+        gene_id: str,
+        mouse_gene_id: Optional[str] = None) -> Tuple[str, float]:
     """given input trait mouse gene id and mouse gene id fetch the lit\
     corr_data"""
     if mouse_gene_id is not None and ";" not in mouse_gene_id:
@@ -190,12 +208,15 @@ def fetch_lit_correlation_data(database,
 
         query_values = (str(mouse_gene_id), str(input_mouse_gene_id))
 
-        results = database.execute(
-            query_formatter(query, *query_values)).fetchone()
-
-        lit_corr_results = results if results is not None else database.execute(
-            query_formatter(query, *tuple(reversed(query_values)))).fetchone()
-
+        results = database.execute(query_formatter(query,
+                                                   *query_values)).fetchone()
+        lit_corr_results = None
+        if results is not None:
+            lit_corr_results = results
+        else:
+            lit_corr_results = database.execute(
+                query_formatter(query,
+                                *tuple(reversed(query_values)))).fetchone()
         lit_results = (gene_id, lit_corr_results.val)\
             if lit_corr_results else (gene_id, 0)
         return lit_results
@@ -203,35 +224,41 @@ def fetch_lit_correlation_data(database,
     return (gene_id, 0)
 
 
-def lit_correlation_for_trait_list(database,
-                                   target_trait_lists: List,
-                                   species: Optional[str] = None,
-                                   trait_gene_id: Optional[str] = None)->List:
+def lit_correlation_for_trait_list(
+        database,
+        target_trait_lists: List,
+        species: Optional[str] = None,
+        trait_gene_id: Optional[str] = None) -> List:
     """given species,base trait gene id fetch the lit corr results from the db\
     output is float for lit corr results """
     fetched_lit_corr_results = []
 
-    this_trait_mouse_gene_id = map_to_mouse_gene_id(
-        database=database, species=species, gene_id=trait_gene_id)
+    this_trait_mouse_gene_id = map_to_mouse_gene_id(database=database,
+                                                    species=species,
+                                                    gene_id=trait_gene_id)
 
     for trait in target_trait_lists:
         target_trait_gene_id = trait.get("gene_id")
         if target_trait_gene_id:
             target_mouse_gene_id = map_to_mouse_gene_id(
-                database=database, species=species, gene_id=target_trait_gene_id)
+                database=database,
+                species=species,
+                gene_id=target_trait_gene_id)
 
             fetched_corr_data = fetch_lit_correlation_data(
-                database=database, input_mouse_gene_id=this_trait_mouse_gene_id,
-                gene_id=target_trait_gene_id, mouse_gene_id=target_mouse_gene_id)
+                database=database,
+                input_mouse_gene_id=this_trait_mouse_gene_id,
+                gene_id=target_trait_gene_id,
+                mouse_gene_id=target_mouse_gene_id)
 
-            dict_results = dict(
-                zip(("gene_id", "lit_corr"), fetched_corr_data))
+            dict_results = dict(zip(("gene_id", "lit_corr"),
+                                    fetched_corr_data))
             fetched_lit_corr_results.append(dict_results)
 
     return fetched_lit_corr_results
 
 
-def query_formatter(query_string: str, * query_values):
+def query_formatter(query_string: str, *query_values):
     """formatter query string given the unformatted query string\
     and the respectibe values.Assumes number of placeholders is
     equal to the number of query values """
@@ -240,11 +267,12 @@ def query_formatter(query_string: str, * query_values):
     return results
 
 
-def map_to_mouse_gene_id(database, species: Optional[str], gene_id: Optional[str])->Optional[str]:
+def map_to_mouse_gene_id(database, species: Optional[str],
+                         gene_id: Optional[str]) -> Optional[str]:
     """given a species which is not mouse map the gene_id\
     to respective mouse gene id"""
-    # AK:xtodo move the code for checking nullity out of thing functions bug while\
-    # method for string
+    # AK:xtodo move the code for checking nullity out of thing functions bug
+    # while method for string
     if None in (species, gene_id):
         return None
     if species == "mouse":
@@ -256,34 +284,36 @@ def map_to_mouse_gene_id(database, species: Optional[str], gene_id: Optional[str
 
     query_values = (species, gene_id)
 
-    results = database.execute(
-        query_formatter(query, *query_values)).fetchone()
+    results = database.execute(query_formatter(query,
+                                               *query_values)).fetchone()
 
     mouse_gene_id = results.mouse if results is not None else None
 
     return mouse_gene_id
 
 
-def compute_all_lit_correlation(database_instance, trait_lists: List, species: str, gene_id):
-    """function that acts as an abstraction for lit_correlation_for_trait_list"""
+def compute_all_lit_correlation(database_instance, trait_lists: List,
+                                species: str, gene_id):
+    """Function that acts as an abstraction for
+    lit_correlation_for_trait_list"""
     # xtodo to be refactored
 
-    lit_results = lit_correlation_for_trait_list(database=database_instance,
-                                                 target_trait_lists=trait_lists,
-                                                 species=species,
-                                                 trait_gene_id=gene_id
-                                                 )
+    lit_results = lit_correlation_for_trait_list(
+        database=database_instance,
+        target_trait_lists=trait_lists,
+        species=species,
+        trait_gene_id=gene_id)
 
-    return {
-        "lit_results": lit_results
-    }
+    return {"lit_results": lit_results}
 
 
 def compute_all_tissue_correlation(primary_tissue_dict: dict,
                                    target_tissues_dict_list: List,
                                    corr_method: str):
-    """function acts as an abstraction for tissue_correlation_for_trait_list\
-    required input are target tissue object and primary tissue trait """
+    """Function acts as an abstraction for tissue_correlation_for_trait_list\
+    required input are target tissue object and primary tissue trait
+
+    """
 
     tissues_results = {}
 
@@ -296,9 +326,10 @@ def compute_all_tissue_correlation(primary_tissue_dict: dict,
 
         target_tissue_vals = target_tissue_obj.get("tissue_values")
 
-        tissue_result = tissue_correlation_for_trait_list(primary_tissue_vals=primary_tissue_vals,
-                                                          target_tissues_values=target_tissue_vals,
-                                                          corr_method=corr_method)
+        tissue_result = tissue_correlation_for_trait_list(
+            primary_tissue_vals=primary_tissue_vals,
+            target_tissues_values=target_tissue_vals,
+            corr_method=corr_method)
 
         tissues_results[trait_id] = tissue_result