about summary refs log tree commit diff
path: root/gn3/computations
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/computations')
-rw-r--r--gn3/computations/biweight.py27
-rw-r--r--gn3/computations/correlations.py41
-rw-r--r--gn3/computations/partial_correlations.py289
-rw-r--r--gn3/computations/wgcna.py49
4 files changed, 354 insertions, 52 deletions
diff --git a/gn3/computations/biweight.py b/gn3/computations/biweight.py
deleted file mode 100644
index 7accd0c..0000000
--- a/gn3/computations/biweight.py
+++ /dev/null
@@ -1,27 +0,0 @@
-"""module contains script to call biweight midcorrelation in R"""
-import subprocess
-
-from typing import List
-from typing import Tuple
-
-from gn3.settings import BIWEIGHT_RSCRIPT
-
-
-def calculate_biweight_corr(trait_vals: List,
-                            target_vals: List,
-                            path_to_script: str = BIWEIGHT_RSCRIPT,
-                            command: str = "Rscript"
-                            ) -> Tuple[float, float]:
-    """biweight function"""
-
-    args_1 = ' '.join(str(trait_val) for trait_val in trait_vals)
-    args_2 = ' '.join(str(target_val) for target_val in target_vals)
-    cmd = [command, path_to_script] + [args_1] + [args_2]
-
-    results = subprocess.check_output(cmd, universal_newlines=True)
-    try:
-        (corr_coeff, p_val) = tuple(
-            [float(y.strip()) for y in results.split()])
-        return (corr_coeff, p_val)
-    except Exception as error:
-        raise error
diff --git a/gn3/computations/correlations.py b/gn3/computations/correlations.py
index bb13ff1..c5c56db 100644
--- a/gn3/computations/correlations.py
+++ b/gn3/computations/correlations.py
@@ -1,6 +1,7 @@
 """module contains code for correlations"""
 import math
 import multiprocessing
+from contextlib import closing
 
 from typing import List
 from typing import Tuple
@@ -8,7 +9,7 @@ from typing import Optional
 from typing import Callable
 
 import scipy.stats
-from gn3.computations.biweight import calculate_biweight_corr
+import pingouin as pg
 
 
 def map_shared_keys_to_values(target_sample_keys: List,
@@ -49,13 +50,9 @@ def normalize_values(a_values: List,
     ([2.3, 4.1, 5], [3.4, 6.2, 4.1], 3)
 
     """
-    a_new = []
-    b_new = []
     for a_val, b_val in zip(a_values, b_values):
         if (a_val and b_val is not None):
-            a_new.append(a_val)
-            b_new.append(b_val)
-    return a_new, b_new, len(a_new)
+            yield a_val, b_val
 
 
 def compute_corr_coeff_p_value(primary_values: List, target_values: List,
@@ -81,8 +78,10 @@ def compute_sample_r_correlation(trait_name, corr_method, trait_vals,
     correlation coeff and p value
 
     """
-    (sanitized_traits_vals, sanitized_target_vals,
-     num_overlap) = normalize_values(trait_vals, target_samples_vals)
+
+    sanitized_traits_vals, sanitized_target_vals = list(
+        zip(*list(normalize_values(trait_vals, target_samples_vals))))
+    num_overlap = len(sanitized_traits_vals)
 
     if num_overlap > 5:
 
@@ -102,11 +101,10 @@ package :not packaged in guix
 
     """
 
-    try:
-        results = calculate_biweight_corr(x_val, y_val)
-        return results
-    except Exception as error:
-        raise error
+    results = pg.corr(x_val, y_val, method="bicor")
+    corr_coeff = results["r"].values[0]
+    p_val = results["p-val"].values[0]
+    return (corr_coeff, p_val)
 
 
 def filter_shared_sample_keys(this_samplelist,
@@ -115,13 +113,9 @@ def filter_shared_sample_keys(this_samplelist,
     filter the values using the shared keys
 
     """
-    this_vals = []
-    target_vals = []
     for key, value in target_samplelist.items():
         if key in this_samplelist:
-            target_vals.append(value)
-            this_vals.append(this_samplelist[key])
-    return (this_vals, target_vals)
+            yield this_samplelist[key], value
 
 
 def fast_compute_all_sample_correlation(this_trait,
@@ -140,9 +134,10 @@ def fast_compute_all_sample_correlation(this_trait,
     for target_trait in target_dataset:
         trait_name = target_trait.get("trait_id")
         target_trait_data = target_trait["trait_sample_data"]
-        processed_values.append((trait_name, corr_method, *filter_shared_sample_keys(
-            this_trait_samples, target_trait_data)))
-    with multiprocessing.Pool(4) as pool:
+        processed_values.append((trait_name, corr_method,
+                                 list(zip(*list(filter_shared_sample_keys(
+                                     this_trait_samples, target_trait_data))))))
+    with closing(multiprocessing.Pool()) as pool:
         results = pool.starmap(compute_sample_r_correlation, processed_values)
 
         for sample_correlation in results:
@@ -173,8 +168,8 @@ def compute_all_sample_correlation(this_trait,
     for target_trait in target_dataset:
         trait_name = target_trait.get("trait_id")
         target_trait_data = target_trait["trait_sample_data"]
-        this_vals, target_vals = filter_shared_sample_keys(
-            this_trait_samples, target_trait_data)
+        this_vals, target_vals = list(zip(*list(filter_shared_sample_keys(
+            this_trait_samples, target_trait_data))))
 
         sample_correlation = compute_sample_r_correlation(
             trait_name=trait_name,
diff --git a/gn3/computations/partial_correlations.py b/gn3/computations/partial_correlations.py
new file mode 100644
index 0000000..07dc16d
--- /dev/null
+++ b/gn3/computations/partial_correlations.py
@@ -0,0 +1,289 @@
+"""
+This module deals with partial correlations.
+
+It is an attempt to migrate over the partial correlations feature from
+GeneNetwork1.
+"""
+
+from functools import reduce
+from typing import Any, Tuple, Sequence
+from scipy.stats import pearsonr, spearmanr
+
+from gn3.settings import TEXTDIR
+from gn3.data_helpers import parse_csv_line
+
+def control_samples(controls: Sequence[dict], sampleslist: Sequence[str]):
+    """
+    Fetches data for the control traits.
+
+    This migrates `web/webqtl/correlation/correlationFunction.controlStrain` in
+    GN1, with a few modifications to the arguments passed in.
+
+    PARAMETERS:
+    controls: A map of sample names to trait data. Equivalent to the `cvals`
+        value in the corresponding source function in GN1.
+    sampleslist: A list of samples. Equivalent to `strainlst` in the
+        corresponding source function in GN1
+    """
+    def __process_control__(trait_data):
+        def __process_sample__(acc, sample):
+            if sample in trait_data["data"].keys():
+                sample_item = trait_data["data"][sample]
+                val = sample_item["value"]
+                if val is not None:
+                    return (
+                        acc[0] + (sample,),
+                        acc[1] + (val,),
+                        acc[2] + (sample_item["variance"],))
+            return acc
+        return reduce(
+            __process_sample__, sampleslist, (tuple(), tuple(), tuple()))
+
+    return reduce(
+        lambda acc, item: (
+            acc[0] + (item[0],),
+            acc[1] + (item[1],),
+            acc[2] + (item[2],),
+            acc[3] + (len(item[0]),),
+        ),
+        [__process_control__(trait_data) for trait_data in controls],
+        (tuple(), tuple(), tuple(), tuple()))
+
+def dictify_by_samples(samples_vals_vars: Sequence[Sequence]) -> Sequence[dict]:
+    """
+    Build a sequence of dictionaries from a sequence of separate sequences of
+    samples, values and variances.
+
+    This is a partial migration of
+    `web.webqtl.correlation.correlationFunction.fixStrains` function in GN1.
+    This implementation extracts code that will find common use, and that will
+    find use in more than one place.
+    """
+    return tuple(
+        {
+            sample: {"sample_name": sample, "value": val, "variance": var}
+            for sample, val, var in zip(*trait_line)
+        } for trait_line in zip(*(samples_vals_vars[0:3])))
+
+def fix_samples(primary_trait: dict, control_traits: Sequence[dict]) -> Sequence[Sequence[Any]]:
+    """
+    Corrects sample_names, values and variance such that they all contain only
+    those samples that are common to the reference trait and all control traits.
+
+    This is a partial migration of the
+    `web.webqtl.correlation.correlationFunction.fixStrain` function in GN1.
+    """
+    primary_samples = tuple(
+        present[0] for present in
+        ((sample, all(sample in control.keys() for control in control_traits))
+         for sample in primary_trait.keys())
+        if present[1])
+    control_vals_vars: tuple = reduce(
+        lambda acc, x: (acc[0] + (x[0],), acc[1] + (x[1],)),
+        ((item["value"], item["variance"])
+         for sublist in [tuple(control.values()) for control in control_traits]
+         for item in sublist),
+        (tuple(), tuple()))
+    return (
+        primary_samples,
+        tuple(primary_trait[sample]["value"] for sample in primary_samples),
+        control_vals_vars[0],
+        tuple(primary_trait[sample]["variance"] for sample in primary_samples),
+        control_vals_vars[1])
+
+def find_identical_traits(
+        primary_name: str, primary_value: float, control_names: Tuple[str, ...],
+        control_values: Tuple[float, ...]) -> Tuple[str, ...]:
+    """
+    Find traits that have the same value when the values are considered to
+    3 decimal places.
+
+    This is a migration of the
+    `web.webqtl.correlation.correlationFunction.findIdenticalTraits` function in
+    GN1.
+    """
+    def __merge_identicals__(
+            acc: Tuple[str, ...],
+            ident: Tuple[str, Tuple[str, ...]]) -> Tuple[str, ...]:
+        return acc + ident[1]
+
+    def __dictify_controls__(acc, control_item):
+        ckey = "{:.3f}".format(control_item[0])
+        return {**acc, ckey: acc.get(ckey, tuple()) + (control_item[1],)}
+
+    return (reduce(## for identical control traits
+        __merge_identicals__,
+        (item for item in reduce(# type: ignore[var-annotated]
+            __dictify_controls__, zip(control_values, control_names),
+            {}).items() if len(item[1]) > 1),
+        tuple())
+            or
+            reduce(## If no identical control traits, try primary and controls
+                __merge_identicals__,
+                (item for item in reduce(# type: ignore[var-annotated]
+                    __dictify_controls__,
+                    zip((primary_value,) + control_values,
+                        (primary_name,) + control_names), {}).items()
+                 if len(item[1]) > 1),
+                tuple()))
+
+def tissue_correlation(
+        primary_trait_values: Tuple[float, ...],
+        target_trait_values: Tuple[float, ...],
+        method: str) -> Tuple[float, float]:
+    """
+    Compute the correlation between the primary trait values, and the values of
+    a single target value.
+
+    This migrates the `cal_tissue_corr` function embedded in the larger
+    `web.webqtl.correlation.correlationFunction.batchCalTissueCorr` function in
+    GeneNetwork1.
+    """
+    def spearman_corr(*args):
+        result = spearmanr(*args)
+        return (result.correlation, result.pvalue)
+
+    method_fns = {"pearson": pearsonr, "spearman": spearman_corr}
+
+    assert len(primary_trait_values) == len(target_trait_values), (
+        "The lengths of the `primary_trait_values` and `target_trait_values` "
+        "must be equal")
+    assert method in method_fns.keys(), (
+        "Method must be one of: {}".format(",".join(method_fns.keys())))
+
+    corr, pvalue = method_fns[method](primary_trait_values, target_trait_values)
+    return (round(corr, 10), round(pvalue, 10))
+
+def batch_computed_tissue_correlation(
+        primary_trait_values: Tuple[float, ...], target_traits_dict: dict,
+        method: str) -> Tuple[dict, dict]:
+    """
+    This is a migration of the
+    `web.webqtl.correlation.correlationFunction.batchCalTissueCorr` function in
+    GeneNetwork1
+    """
+    def __corr__(acc, target):
+        corr = tissue_correlation(primary_trait_values, target[1], method)
+        return ({**acc[0], target[0]: corr[0]}, {**acc[0], target[1]: corr[1]})
+    return reduce(__corr__, target_traits_dict.items(), ({}, {}))
+
+def correlations_of_all_tissue_traits(
+        primary_trait_symbol_value_dict: dict, symbol_value_dict: dict,
+        method: str) -> Tuple[dict, dict]:
+    """
+    Computes and returns the correlation of all tissue traits.
+
+    This is a migration of the
+    `web.webqtl.correlation.correlationFunction.calculateCorrOfAllTissueTrait`
+    function in GeneNetwork1.
+    """
+    primary_trait_values = tuple(primary_trait_symbol_value_dict.values())[0]
+    return batch_computed_tissue_correlation(
+        primary_trait_values, symbol_value_dict, method)
+
+def good_dataset_samples_indexes(
+        samples: Tuple[str, ...],
+        samples_from_file: Tuple[str, ...]) -> Tuple[int, ...]:
+    """
+    Return the indexes of the items in `samples_from_files` that are also found
+    in `samples`.
+
+    This is a partial migration of the
+    `web.webqtl.correlation.PartialCorrDBPage.getPartialCorrelationsFast`
+    function in GeneNetwork1.
+    """
+    return tuple(sorted(
+        samples_from_file.index(good) for good in
+        set(samples).intersection(set(samples_from_file))))
+
+def determine_partials(
+        primary_vals, control_vals, all_target_trait_names,
+        all_target_trait_values, method):
+    """
+    This **WILL** be a migration of
+    `web.webqtl.correlation.correlationFunction.determinePartialsByR` function
+    in GeneNetwork1.
+
+    The function in GeneNetwork1 contains code written in R that is then used to
+    compute the partial correlations.
+    """
+    ## This function is not implemented at this stage
+    return tuple(
+        primary_vals, control_vals, all_target_trait_names,
+        all_target_trait_values, method)
+
+def compute_partial_correlations_fast(# pylint: disable=[R0913, R0914]
+        samples, primary_vals, control_vals, database_filename,
+        fetched_correlations, method: str, correlation_type: str) -> Tuple[
+            float, Tuple[float, ...]]:
+    """
+    This is a partial migration of the
+    `web.webqtl.correlation.PartialCorrDBPage.getPartialCorrelationsFast`
+    function in GeneNetwork1.
+    """
+    assert method in ("spearman", "pearson")
+    with open(f"{TEXTDIR}/{database_filename}", "r") as dataset_file:
+        dataset = tuple(dataset_file.readlines())
+
+    good_dataset_samples = good_dataset_samples_indexes(
+        samples, parse_csv_line(dataset[0])[1:])
+
+    def __process_trait_names_and_values__(acc, line):
+        trait_line = parse_csv_line(line)
+        trait_name = trait_line[0]
+        trait_data = trait_line[1:]
+        if trait_name in fetched_correlations.keys():
+            return (
+                acc[0] + (trait_name,),
+                acc[1] + tuple(
+                    trait_data[i] if i in good_dataset_samples else None
+                    for i in range(len(trait_data))))
+        return acc
+
+    processed_trait_names_values: tuple = reduce(
+        __process_trait_names_and_values__, dataset[1:], (tuple(), tuple()))
+    all_target_trait_names: Tuple[str, ...] = processed_trait_names_values[0]
+    all_target_trait_values: Tuple[float, ...] = processed_trait_names_values[1]
+
+    all_correlations = determine_partials(
+        primary_vals, control_vals, all_target_trait_names,
+        all_target_trait_values, method)
+    ## Line 772 to 779 in GN1 are the cause of the weird complexity in the
+    ## return below. Once the surrounding code is successfully migrated and
+    ## reworked, this complexity might go away, by getting rid of the
+    ## `correlation_type` parameter
+    return len(all_correlations), tuple(
+        corr + (
+            (fetched_correlations[corr[0]],) if correlation_type == "literature"
+            else fetched_correlations[corr[0]][0:2])
+        for idx, corr in enumerate(all_correlations))
+
+def partial_correlation_matrix(
+        xdata: Tuple[float, ...], ydata: Tuple[float, ...],
+        zdata: Tuple[float, ...], method: str = "pearsons",
+        omit_nones: bool = True) -> float:
+    """
+    Computes the partial correlation coefficient using the
+    'variance-covariance matrix' method
+
+    This is a partial migration of the
+    `web.webqtl.correlation.correlationFunction.determinPartialsByR` function in
+    GeneNetwork1, specifically the `pcor.mat` function written in the R
+    programming language.
+    """
+    return 0
+
+def partial_correlation_recursive(
+        xdata: Tuple[float, ...], ydata: Tuple[float, ...],
+        zdata: Tuple[float, ...], method: str = "pearsons",
+        omit_nones: bool = True) -> float:
+    """
+    Computes the partial correlation coefficient using the 'recursive formula'
+    method
+
+    This is a partial migration of the
+    `web.webqtl.correlation.correlationFunction.determinPartialsByR` function in
+    GeneNetwork1, specifically the `pcor.rec` function written in the R
+    programming language.
+    """
+    return 0
diff --git a/gn3/computations/wgcna.py b/gn3/computations/wgcna.py
index fd508fa..ab12fe7 100644
--- a/gn3/computations/wgcna.py
+++ b/gn3/computations/wgcna.py
@@ -3,8 +3,11 @@
 import os
 import json
 import uuid
-from gn3.settings import TMPDIR
+import subprocess
+import base64
+
 
+from gn3.settings import TMPDIR
 from gn3.commands import run_cmd
 
 
@@ -14,12 +17,46 @@ def dump_wgcna_data(request_data: dict):
 
     temp_file_path = os.path.join(TMPDIR, filename)
 
+    request_data["TMPDIR"] = TMPDIR
+
     with open(temp_file_path, "w") as output_file:
         json.dump(request_data, output_file)
 
     return temp_file_path
 
 
+def stream_cmd_output(socketio, request_data, cmd: str):
+    """function to stream in realtime"""
+    # xtodo  syncing and closing /edge cases
+
+    socketio.emit("output", {"data": f"calling you script {cmd}"},
+                  namespace="/", room=request_data["socket_id"])
+    results = subprocess.Popen(
+        cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
+
+    if results.stdout is not None:
+
+        for line in iter(results.stdout.readline, b""):
+            socketio.emit("output",
+                          {"data": line.decode("utf-8").rstrip()},
+                          namespace="/", room=request_data["socket_id"])
+
+        socketio.emit(
+            "output", {"data":
+                       "parsing the output results"}, namespace="/",
+            room=request_data["socket_id"])
+
+
+def process_image(image_loc: str) -> bytes:
+    """encode the image"""
+
+    try:
+        with open(image_loc, "rb") as image_file:
+            return base64.b64encode(image_file.read())
+    except FileNotFoundError:
+        return b""
+
+
 def compose_wgcna_cmd(rscript_path: str, temp_file_path: str):
     """function to componse wgcna cmd"""
     # (todo):issue relative paths to abs paths
@@ -32,6 +69,8 @@ def call_wgcna_script(rscript_path: str, request_data: dict):
     generated_file = dump_wgcna_data(request_data)
     cmd = compose_wgcna_cmd(rscript_path, generated_file)
 
+    # stream_cmd_output(request_data, cmd)  disable streaming of data
+
     try:
 
         run_cmd_results = run_cmd(cmd)
@@ -40,8 +79,14 @@ def call_wgcna_script(rscript_path: str, request_data: dict):
 
             if run_cmd_results["code"] != 0:
                 return run_cmd_results
+
+            output_file_data = json.load(outputfile)
+            output_file_data["output"]["image_data"] = process_image(
+                output_file_data["output"]["imageLoc"]).decode("ascii")
+            # json format only supports  unicode string// to get image data reconvert
+
             return {
-                "data": json.load(outputfile),
+                "data": output_file_data,
                 **run_cmd_results
             }
     except FileNotFoundError: