diff options
Diffstat (limited to 'gn3/computations')
-rw-r--r-- | gn3/computations/biweight.py | 27 | ||||
-rw-r--r-- | gn3/computations/correlations.py | 41 | ||||
-rw-r--r-- | gn3/computations/partial_correlations.py | 289 | ||||
-rw-r--r-- | gn3/computations/wgcna.py | 49 |
4 files changed, 354 insertions, 52 deletions
diff --git a/gn3/computations/biweight.py b/gn3/computations/biweight.py deleted file mode 100644 index 7accd0c..0000000 --- a/gn3/computations/biweight.py +++ /dev/null @@ -1,27 +0,0 @@ -"""module contains script to call biweight midcorrelation in R""" -import subprocess - -from typing import List -from typing import Tuple - -from gn3.settings import BIWEIGHT_RSCRIPT - - -def calculate_biweight_corr(trait_vals: List, - target_vals: List, - path_to_script: str = BIWEIGHT_RSCRIPT, - command: str = "Rscript" - ) -> Tuple[float, float]: - """biweight function""" - - args_1 = ' '.join(str(trait_val) for trait_val in trait_vals) - args_2 = ' '.join(str(target_val) for target_val in target_vals) - cmd = [command, path_to_script] + [args_1] + [args_2] - - results = subprocess.check_output(cmd, universal_newlines=True) - try: - (corr_coeff, p_val) = tuple( - [float(y.strip()) for y in results.split()]) - return (corr_coeff, p_val) - except Exception as error: - raise error diff --git a/gn3/computations/correlations.py b/gn3/computations/correlations.py index bb13ff1..c5c56db 100644 --- a/gn3/computations/correlations.py +++ b/gn3/computations/correlations.py @@ -1,6 +1,7 @@ """module contains code for correlations""" import math import multiprocessing +from contextlib import closing from typing import List from typing import Tuple @@ -8,7 +9,7 @@ from typing import Optional from typing import Callable import scipy.stats -from gn3.computations.biweight import calculate_biweight_corr +import pingouin as pg def map_shared_keys_to_values(target_sample_keys: List, @@ -49,13 +50,9 @@ def normalize_values(a_values: List, ([2.3, 4.1, 5], [3.4, 6.2, 4.1], 3) """ - a_new = [] - b_new = [] for a_val, b_val in zip(a_values, b_values): if (a_val and b_val is not None): - a_new.append(a_val) - b_new.append(b_val) - return a_new, b_new, len(a_new) + yield a_val, b_val def compute_corr_coeff_p_value(primary_values: List, target_values: List, @@ -81,8 +78,10 @@ def compute_sample_r_correlation(trait_name, corr_method, trait_vals, correlation coeff and p value """ - (sanitized_traits_vals, sanitized_target_vals, - num_overlap) = normalize_values(trait_vals, target_samples_vals) + + sanitized_traits_vals, sanitized_target_vals = list( + zip(*list(normalize_values(trait_vals, target_samples_vals)))) + num_overlap = len(sanitized_traits_vals) if num_overlap > 5: @@ -102,11 +101,10 @@ package :not packaged in guix """ - try: - results = calculate_biweight_corr(x_val, y_val) - return results - except Exception as error: - raise error + results = pg.corr(x_val, y_val, method="bicor") + corr_coeff = results["r"].values[0] + p_val = results["p-val"].values[0] + return (corr_coeff, p_val) def filter_shared_sample_keys(this_samplelist, @@ -115,13 +113,9 @@ def filter_shared_sample_keys(this_samplelist, filter the values using the shared keys """ - this_vals = [] - target_vals = [] for key, value in target_samplelist.items(): if key in this_samplelist: - target_vals.append(value) - this_vals.append(this_samplelist[key]) - return (this_vals, target_vals) + yield this_samplelist[key], value def fast_compute_all_sample_correlation(this_trait, @@ -140,9 +134,10 @@ def fast_compute_all_sample_correlation(this_trait, for target_trait in target_dataset: trait_name = target_trait.get("trait_id") target_trait_data = target_trait["trait_sample_data"] - processed_values.append((trait_name, corr_method, *filter_shared_sample_keys( - this_trait_samples, target_trait_data))) - with multiprocessing.Pool(4) as pool: + processed_values.append((trait_name, corr_method, + list(zip(*list(filter_shared_sample_keys( + this_trait_samples, target_trait_data)))))) + with closing(multiprocessing.Pool()) as pool: results = pool.starmap(compute_sample_r_correlation, processed_values) for sample_correlation in results: @@ -173,8 +168,8 @@ def compute_all_sample_correlation(this_trait, for target_trait in target_dataset: trait_name = target_trait.get("trait_id") target_trait_data = target_trait["trait_sample_data"] - this_vals, target_vals = filter_shared_sample_keys( - this_trait_samples, target_trait_data) + this_vals, target_vals = list(zip(*list(filter_shared_sample_keys( + this_trait_samples, target_trait_data)))) sample_correlation = compute_sample_r_correlation( trait_name=trait_name, diff --git a/gn3/computations/partial_correlations.py b/gn3/computations/partial_correlations.py new file mode 100644 index 0000000..07dc16d --- /dev/null +++ b/gn3/computations/partial_correlations.py @@ -0,0 +1,289 @@ +""" +This module deals with partial correlations. + +It is an attempt to migrate over the partial correlations feature from +GeneNetwork1. +""" + +from functools import reduce +from typing import Any, Tuple, Sequence +from scipy.stats import pearsonr, spearmanr + +from gn3.settings import TEXTDIR +from gn3.data_helpers import parse_csv_line + +def control_samples(controls: Sequence[dict], sampleslist: Sequence[str]): + """ + Fetches data for the control traits. + + This migrates `web/webqtl/correlation/correlationFunction.controlStrain` in + GN1, with a few modifications to the arguments passed in. + + PARAMETERS: + controls: A map of sample names to trait data. Equivalent to the `cvals` + value in the corresponding source function in GN1. + sampleslist: A list of samples. Equivalent to `strainlst` in the + corresponding source function in GN1 + """ + def __process_control__(trait_data): + def __process_sample__(acc, sample): + if sample in trait_data["data"].keys(): + sample_item = trait_data["data"][sample] + val = sample_item["value"] + if val is not None: + return ( + acc[0] + (sample,), + acc[1] + (val,), + acc[2] + (sample_item["variance"],)) + return acc + return reduce( + __process_sample__, sampleslist, (tuple(), tuple(), tuple())) + + return reduce( + lambda acc, item: ( + acc[0] + (item[0],), + acc[1] + (item[1],), + acc[2] + (item[2],), + acc[3] + (len(item[0]),), + ), + [__process_control__(trait_data) for trait_data in controls], + (tuple(), tuple(), tuple(), tuple())) + +def dictify_by_samples(samples_vals_vars: Sequence[Sequence]) -> Sequence[dict]: + """ + Build a sequence of dictionaries from a sequence of separate sequences of + samples, values and variances. + + This is a partial migration of + `web.webqtl.correlation.correlationFunction.fixStrains` function in GN1. + This implementation extracts code that will find common use, and that will + find use in more than one place. + """ + return tuple( + { + sample: {"sample_name": sample, "value": val, "variance": var} + for sample, val, var in zip(*trait_line) + } for trait_line in zip(*(samples_vals_vars[0:3]))) + +def fix_samples(primary_trait: dict, control_traits: Sequence[dict]) -> Sequence[Sequence[Any]]: + """ + Corrects sample_names, values and variance such that they all contain only + those samples that are common to the reference trait and all control traits. + + This is a partial migration of the + `web.webqtl.correlation.correlationFunction.fixStrain` function in GN1. + """ + primary_samples = tuple( + present[0] for present in + ((sample, all(sample in control.keys() for control in control_traits)) + for sample in primary_trait.keys()) + if present[1]) + control_vals_vars: tuple = reduce( + lambda acc, x: (acc[0] + (x[0],), acc[1] + (x[1],)), + ((item["value"], item["variance"]) + for sublist in [tuple(control.values()) for control in control_traits] + for item in sublist), + (tuple(), tuple())) + return ( + primary_samples, + tuple(primary_trait[sample]["value"] for sample in primary_samples), + control_vals_vars[0], + tuple(primary_trait[sample]["variance"] for sample in primary_samples), + control_vals_vars[1]) + +def find_identical_traits( + primary_name: str, primary_value: float, control_names: Tuple[str, ...], + control_values: Tuple[float, ...]) -> Tuple[str, ...]: + """ + Find traits that have the same value when the values are considered to + 3 decimal places. + + This is a migration of the + `web.webqtl.correlation.correlationFunction.findIdenticalTraits` function in + GN1. + """ + def __merge_identicals__( + acc: Tuple[str, ...], + ident: Tuple[str, Tuple[str, ...]]) -> Tuple[str, ...]: + return acc + ident[1] + + def __dictify_controls__(acc, control_item): + ckey = "{:.3f}".format(control_item[0]) + return {**acc, ckey: acc.get(ckey, tuple()) + (control_item[1],)} + + return (reduce(## for identical control traits + __merge_identicals__, + (item for item in reduce(# type: ignore[var-annotated] + __dictify_controls__, zip(control_values, control_names), + {}).items() if len(item[1]) > 1), + tuple()) + or + reduce(## If no identical control traits, try primary and controls + __merge_identicals__, + (item for item in reduce(# type: ignore[var-annotated] + __dictify_controls__, + zip((primary_value,) + control_values, + (primary_name,) + control_names), {}).items() + if len(item[1]) > 1), + tuple())) + +def tissue_correlation( + primary_trait_values: Tuple[float, ...], + target_trait_values: Tuple[float, ...], + method: str) -> Tuple[float, float]: + """ + Compute the correlation between the primary trait values, and the values of + a single target value. + + This migrates the `cal_tissue_corr` function embedded in the larger + `web.webqtl.correlation.correlationFunction.batchCalTissueCorr` function in + GeneNetwork1. + """ + def spearman_corr(*args): + result = spearmanr(*args) + return (result.correlation, result.pvalue) + + method_fns = {"pearson": pearsonr, "spearman": spearman_corr} + + assert len(primary_trait_values) == len(target_trait_values), ( + "The lengths of the `primary_trait_values` and `target_trait_values` " + "must be equal") + assert method in method_fns.keys(), ( + "Method must be one of: {}".format(",".join(method_fns.keys()))) + + corr, pvalue = method_fns[method](primary_trait_values, target_trait_values) + return (round(corr, 10), round(pvalue, 10)) + +def batch_computed_tissue_correlation( + primary_trait_values: Tuple[float, ...], target_traits_dict: dict, + method: str) -> Tuple[dict, dict]: + """ + This is a migration of the + `web.webqtl.correlation.correlationFunction.batchCalTissueCorr` function in + GeneNetwork1 + """ + def __corr__(acc, target): + corr = tissue_correlation(primary_trait_values, target[1], method) + return ({**acc[0], target[0]: corr[0]}, {**acc[0], target[1]: corr[1]}) + return reduce(__corr__, target_traits_dict.items(), ({}, {})) + +def correlations_of_all_tissue_traits( + primary_trait_symbol_value_dict: dict, symbol_value_dict: dict, + method: str) -> Tuple[dict, dict]: + """ + Computes and returns the correlation of all tissue traits. + + This is a migration of the + `web.webqtl.correlation.correlationFunction.calculateCorrOfAllTissueTrait` + function in GeneNetwork1. + """ + primary_trait_values = tuple(primary_trait_symbol_value_dict.values())[0] + return batch_computed_tissue_correlation( + primary_trait_values, symbol_value_dict, method) + +def good_dataset_samples_indexes( + samples: Tuple[str, ...], + samples_from_file: Tuple[str, ...]) -> Tuple[int, ...]: + """ + Return the indexes of the items in `samples_from_files` that are also found + in `samples`. + + This is a partial migration of the + `web.webqtl.correlation.PartialCorrDBPage.getPartialCorrelationsFast` + function in GeneNetwork1. + """ + return tuple(sorted( + samples_from_file.index(good) for good in + set(samples).intersection(set(samples_from_file)))) + +def determine_partials( + primary_vals, control_vals, all_target_trait_names, + all_target_trait_values, method): + """ + This **WILL** be a migration of + `web.webqtl.correlation.correlationFunction.determinePartialsByR` function + in GeneNetwork1. + + The function in GeneNetwork1 contains code written in R that is then used to + compute the partial correlations. + """ + ## This function is not implemented at this stage + return tuple( + primary_vals, control_vals, all_target_trait_names, + all_target_trait_values, method) + +def compute_partial_correlations_fast(# pylint: disable=[R0913, R0914] + samples, primary_vals, control_vals, database_filename, + fetched_correlations, method: str, correlation_type: str) -> Tuple[ + float, Tuple[float, ...]]: + """ + This is a partial migration of the + `web.webqtl.correlation.PartialCorrDBPage.getPartialCorrelationsFast` + function in GeneNetwork1. + """ + assert method in ("spearman", "pearson") + with open(f"{TEXTDIR}/{database_filename}", "r") as dataset_file: + dataset = tuple(dataset_file.readlines()) + + good_dataset_samples = good_dataset_samples_indexes( + samples, parse_csv_line(dataset[0])[1:]) + + def __process_trait_names_and_values__(acc, line): + trait_line = parse_csv_line(line) + trait_name = trait_line[0] + trait_data = trait_line[1:] + if trait_name in fetched_correlations.keys(): + return ( + acc[0] + (trait_name,), + acc[1] + tuple( + trait_data[i] if i in good_dataset_samples else None + for i in range(len(trait_data)))) + return acc + + processed_trait_names_values: tuple = reduce( + __process_trait_names_and_values__, dataset[1:], (tuple(), tuple())) + all_target_trait_names: Tuple[str, ...] = processed_trait_names_values[0] + all_target_trait_values: Tuple[float, ...] = processed_trait_names_values[1] + + all_correlations = determine_partials( + primary_vals, control_vals, all_target_trait_names, + all_target_trait_values, method) + ## Line 772 to 779 in GN1 are the cause of the weird complexity in the + ## return below. Once the surrounding code is successfully migrated and + ## reworked, this complexity might go away, by getting rid of the + ## `correlation_type` parameter + return len(all_correlations), tuple( + corr + ( + (fetched_correlations[corr[0]],) if correlation_type == "literature" + else fetched_correlations[corr[0]][0:2]) + for idx, corr in enumerate(all_correlations)) + +def partial_correlation_matrix( + xdata: Tuple[float, ...], ydata: Tuple[float, ...], + zdata: Tuple[float, ...], method: str = "pearsons", + omit_nones: bool = True) -> float: + """ + Computes the partial correlation coefficient using the + 'variance-covariance matrix' method + + This is a partial migration of the + `web.webqtl.correlation.correlationFunction.determinPartialsByR` function in + GeneNetwork1, specifically the `pcor.mat` function written in the R + programming language. + """ + return 0 + +def partial_correlation_recursive( + xdata: Tuple[float, ...], ydata: Tuple[float, ...], + zdata: Tuple[float, ...], method: str = "pearsons", + omit_nones: bool = True) -> float: + """ + Computes the partial correlation coefficient using the 'recursive formula' + method + + This is a partial migration of the + `web.webqtl.correlation.correlationFunction.determinPartialsByR` function in + GeneNetwork1, specifically the `pcor.rec` function written in the R + programming language. + """ + return 0 diff --git a/gn3/computations/wgcna.py b/gn3/computations/wgcna.py index fd508fa..ab12fe7 100644 --- a/gn3/computations/wgcna.py +++ b/gn3/computations/wgcna.py @@ -3,8 +3,11 @@ import os import json import uuid -from gn3.settings import TMPDIR +import subprocess +import base64 + +from gn3.settings import TMPDIR from gn3.commands import run_cmd @@ -14,12 +17,46 @@ def dump_wgcna_data(request_data: dict): temp_file_path = os.path.join(TMPDIR, filename) + request_data["TMPDIR"] = TMPDIR + with open(temp_file_path, "w") as output_file: json.dump(request_data, output_file) return temp_file_path +def stream_cmd_output(socketio, request_data, cmd: str): + """function to stream in realtime""" + # xtodo syncing and closing /edge cases + + socketio.emit("output", {"data": f"calling you script {cmd}"}, + namespace="/", room=request_data["socket_id"]) + results = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + + if results.stdout is not None: + + for line in iter(results.stdout.readline, b""): + socketio.emit("output", + {"data": line.decode("utf-8").rstrip()}, + namespace="/", room=request_data["socket_id"]) + + socketio.emit( + "output", {"data": + "parsing the output results"}, namespace="/", + room=request_data["socket_id"]) + + +def process_image(image_loc: str) -> bytes: + """encode the image""" + + try: + with open(image_loc, "rb") as image_file: + return base64.b64encode(image_file.read()) + except FileNotFoundError: + return b"" + + def compose_wgcna_cmd(rscript_path: str, temp_file_path: str): """function to componse wgcna cmd""" # (todo):issue relative paths to abs paths @@ -32,6 +69,8 @@ def call_wgcna_script(rscript_path: str, request_data: dict): generated_file = dump_wgcna_data(request_data) cmd = compose_wgcna_cmd(rscript_path, generated_file) + # stream_cmd_output(request_data, cmd) disable streaming of data + try: run_cmd_results = run_cmd(cmd) @@ -40,8 +79,14 @@ def call_wgcna_script(rscript_path: str, request_data: dict): if run_cmd_results["code"] != 0: return run_cmd_results + + output_file_data = json.load(outputfile) + output_file_data["output"]["image_data"] = process_image( + output_file_data["output"]["imageLoc"]).decode("ascii") + # json format only supports unicode string// to get image data reconvert + return { - "data": json.load(outputfile), + "data": output_file_data, **run_cmd_results } except FileNotFoundError: |