# Copyright (C) University of Tennessee Health Science Center, Memphis, TN. # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License # as published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # This program is available from Source Forge: at GeneNetwork Project # (sourceforge.net/projects/genenetwork/). # # Contact Dr. Robert W. Williams at rwilliams@uthsc.edu # # # This module is used by GeneNetwork project (www.genenetwork.org) import hashlib import html import json from gn2.base.trait import create_trait, jsonable from gn2.base.data_set import create_dataset from gn2.utility import hmac from gn2.utility.type_checking import get_float, get_int, get_string from gn2.utility.redis_tools import get_redis_conn Redis = get_redis_conn() def set_template_vars(start_vars, correlation_data): corr_type = start_vars['corr_type'] corr_method = start_vars['corr_sample_method'] if start_vars['dataset'] == "Temp": this_dataset_ob = create_dataset( dataset_name="Temp", dataset_type="Temp", group_name=start_vars['group']) else: this_dataset_ob = create_dataset(dataset_name=start_vars['dataset']) this_trait = create_trait(dataset=this_dataset_ob, name=start_vars['trait_id']) # Store trait sample data in Redis, so additive effect scatterplots can include edited values dhash = hashlib.md5() dhash.update(start_vars['sample_vals'].encode()) samples_hash = dhash.hexdigest() Redis.set(samples_hash, start_vars['sample_vals'], ex=7*24*60*60) correlation_data['dataid'] = samples_hash correlation_data['this_trait'] = jsonable(this_trait, this_dataset_ob) correlation_data['this_dataset'] = this_dataset_ob.as_monadic_dict().data target_dataset_ob = create_dataset(correlation_data['target_dataset']) correlation_data['target_dataset'] = target_dataset_ob.as_monadic_dict().data correlation_data['table_json'] = correlation_json_for_table( start_vars, correlation_data, target_dataset_ob) if target_dataset_ob.type == "ProbeSet": filter_cols = [7, 6] elif target_dataset_ob.type == "Publish": filter_cols = [8, 5] else: filter_cols = [4, 0] correlation_data['corr_method'] = corr_method correlation_data['filter_cols'] = filter_cols correlation_data['header_fields'] = get_header_fields( target_dataset_ob.type, correlation_data['corr_method']) correlation_data['formatted_corr_type'] = get_formatted_corr_type( corr_type, corr_method) return correlation_data def apply_filters(trait, target_trait, target_dataset, **filters): def __p_val_filter__(p_lower, p_upper): return not (p_lower <= float(trait.get("corr_coefficient",0.0)) <= p_upper) def __min_filter__(min_expr): if (target_dataset['type'] in ["ProbeSet", "Publish"] and target_trait['mean']): return (min_expr != None) and (float(target_trait['mean']) < min_expr) return False def __location_filter__(location_type, location_chr, min_location_mb, max_location_mb): if target_dataset["type"] in ["ProbeSet", "Geno"] and location_type == "gene": if not target_trait['mb'] or not target_trait['chr']: return True return ( ((location_chr!=None) and (target_trait["chr"]!=location_chr)) or ((min_location_mb!= None) and ( float(target_trait['mb']) < min_location_mb) ) or ((max_location_mb != None) and (float(target_trait['mb']) > float(max_location_mb) )) ) elif target_dataset["type"] in ["ProbeSet", "Publish"]: return ((location_chr!=None) and (target_trait["lrs_chr"] != location_chr) or ((min_location_mb != None) and ( float(target_trait['lrs_mb']) < float(min_location_mb))) or ((max_location_mb != None) and ( float(target_trait['lrs_mb']) > float(max_location_mb)) ) ) return True if not target_trait: return True else: # check if one of the condition is not met i.e One is True return (__p_val_filter__( filters.get("p_range_lower"), filters.get("p_range_upper") ) or ( __min_filter__( filters.get("min_expr") ) ) or __location_filter__( filters.get("location_type"), filters.get("location_chr"), filters.get("min_location_mb"), filters.get("max_location_mb") ) ) def get_user_filters(start_vars): (min_expr, p_min, p_max) = ( get_float(start_vars, 'min_expr'), get_float(start_vars, 'p_range_lower', -1.0), get_float(start_vars, 'p_range_upper', 1.0) ) if all(keys in start_vars for keys in ["loc_chr", "min_loc_mb", "max_loc_mb"]): location_chr = get_string(start_vars, "loc_chr") min_location_mb = get_int(start_vars, "min_loc_mb") max_location_mb = get_int(start_vars, "max_loc_mb") else: location_chr = min_location_mb = max_location_mb = None return { "min_expr": min_expr, "p_range_lower": p_min, "p_range_upper": p_max, "location_chr": location_chr, "location_type": start_vars['location_type'], "min_location_mb": min_location_mb, "max_location_mb": max_location_mb } def generate_table_metadata(all_traits, dataset_metadata, dataset_obj): def __fetch_trait_data__(trait, dataset_obj): target_trait_ob = create_trait(dataset=dataset_obj, name=trait, get_qtl_info=True) return jsonable(target_trait_ob, dataset_obj) metadata = [__fetch_trait_data__(trait, dataset_obj) for trait in (all_traits)] return (dataset_metadata | ({str(trait["name"]): trait for trait in metadata})) def populate_table(dataset_metadata, target_dataset, this_dataset, corr_results, filters): def __populate_trait__(idx, trait): trait_name = list(trait.keys())[0] target_trait = dataset_metadata.get(trait_name) trait = trait[trait_name] if not apply_filters(trait, target_trait, target_dataset, **filters): results_dict = {} results_dict['index'] = idx + 1 # results_dict['trait_id'] = target_trait['name'] results_dict['dataset'] = target_dataset['name'] results_dict['hmac'] = hmac.data_hmac( '{}:{}'.format(target_trait['name'], target_dataset['name'])) results_dict['sample_r'] = f"{float(trait.get('corr_coefficient',0.0)):.3f}" results_dict['num_overlap'] = trait.get('num_overlap', 0) results_dict['sample_p'] = f"{float(trait.get('p_value',0)):.2e}" if target_dataset['type'] == "ProbeSet": results_dict['symbol'] = target_trait['symbol'] results_dict['description'] = "N/A" results_dict['location'] = target_trait['location'] results_dict['mean'] = "N/A" results_dict['additive'] = "N/A" if target_trait['description'].strip(): results_dict['description'] = html.escape( target_trait['description'].strip(), quote=True) if target_trait['mean']: results_dict['mean'] = f"{float(target_trait['mean']):.3f}" try: results_dict['lod_score'] = f"{float(target_trait['lrs_score']) / 4.61:.1f}" except: results_dict['lod_score'] = "N/A" results_dict['lrs_location'] = target_trait['lrs_location'] if target_trait['additive']: results_dict['additive'] = f"{float(target_trait['additive']):.3f}" results_dict['lit_corr'] = "--" results_dict['tissue_corr'] = "--" results_dict['tissue_pvalue'] = "--" if this_dataset['type'] == "ProbeSet": if 'lit_corr' in trait: results_dict['lit_corr'] = ( f"{float(trait['lit_corr']):.3f}" if trait["lit_corr"] else "--") if 'tissue_corr' in trait: results_dict['tissue_corr'] = f"{float(trait['tissue_corr']):.3f}" results_dict['tissue_pvalue'] = f"{float(trait['tissue_p_val']):.3e}" elif target_dataset['type'] == "Publish": results_dict['abbreviation_display'] = "N/A" results_dict['description'] = "N/A" results_dict['mean'] = "N/A" results_dict['authors_display'] = "N/A" results_dict['additive'] = "N/A" results_dict['pubmed_link'] = "N/A" results_dict['pubmed_text'] = target_trait["pubmed_text"] if target_trait["abbreviation"]: results_dict['abbreviation'] = target_trait['abbreviation'] if target_trait["description"].strip(): results_dict['description'] = html.escape( target_trait['description'].strip(), quote=True) if target_trait["mean"] != "N/A": results_dict['mean'] = f"{float(target_trait['mean']):.3f}" results_dict['lrs_location'] = target_trait['lrs_location'] if target_trait["authors"]: authors_list = target_trait['authors'].split(',') results_dict['authors_display'] = ", ".join( authors_list[:6]) + ", et al." if len(authors_list) > 6 else target_trait['authors'] if "pubmed_id" in target_trait: results_dict['pubmed_link'] = target_trait['pubmed_link'] results_dict['pubmed_text'] = target_trait['pubmed_text'] try: results_dict["lod_score"] = f"{float(target_trait['lrs_score']) / 4.61:.1f}" except ValueError: results_dict['lod_score'] = "N/A" else: results_dict['location'] = target_trait['location'] return results_dict return [__populate_trait__(idx, trait) for (idx, trait) in enumerate(corr_results)] def correlation_json_for_table(start_vars, correlation_data, target_dataset_ob): """Return JSON data for use with the DataTable in the correlation result page Keyword arguments: correlation_data -- Correlation results this_trait -- Trait being correlated against a dataset, as a dict this_dataset -- Dataset of this_trait, as a monadic dict target_dataset_ob - Target dataset, as a Dataset ob """ this_dataset = correlation_data['this_dataset'] traits = set() for trait in correlation_data["correlation_results"]: traits.add(list(trait)[0]) dataset_metadata = generate_table_metadata(traits, correlation_data["traits_metadata"], target_dataset_ob) return json.dumps([result for result in ( populate_table(dataset_metadata=dataset_metadata, target_dataset=target_dataset_ob.as_monadic_dict().data, this_dataset=correlation_data['this_dataset'], corr_results=correlation_data['correlation_results'], filters=get_user_filters(start_vars))) if result]) def get_formatted_corr_type(corr_type, corr_method): formatted_corr_type = "" if corr_type == "lit": formatted_corr_type += "Literature Correlation " elif corr_type == "tissue": formatted_corr_type += "Tissue Correlation " elif corr_type == "sample": formatted_corr_type += "Genetic Correlation " if corr_method == "pearson": formatted_corr_type += "(Pearson's r)" elif corr_method == "spearman": formatted_corr_type += "(Spearman's rho)" elif corr_method == "bicor": formatted_corr_type += "(Biweight r)" return formatted_corr_type def get_header_fields(data_type, corr_method): if data_type == "ProbeSet": if corr_method == "spearman": header_fields = ['Index', 'Record', 'Symbol', 'Description', 'Location', 'Mean', 'Sample rho', 'N', 'Sample p(rho)', 'Lit rho', 'Tissue rho', 'Tissue p(rho)', 'Max LRS', 'Max LRS Location', 'Additive Effect'] else: header_fields = ['Index', 'Record', 'Symbol', 'Description', 'Location', 'Mean', 'Sample r', 'N', 'Sample p(r)', 'Lit r', 'Tissue r', 'Tissue p(r)', 'Max LRS', 'Max LRS Location', 'Additive Effect'] elif data_type == "Publish": if corr_method == "spearman": header_fields = ['Index', 'Record', 'Abbreviation', 'Description', 'Mean', 'Authors', 'Year', 'Sample rho', 'N', 'Sample p(rho)', 'Max LRS', 'Max LRS Location', 'Additive Effect'] else: header_fields = ['Index', 'Record', 'Abbreviation', 'Description', 'Mean', 'Authors', 'Year', 'Sample r', 'N', 'Sample p(r)', 'Max LRS', 'Max LRS Location', 'Additive Effect'] else: if corr_method == "spearman": header_fields = ['Index', 'ID', 'Location', 'Sample rho', 'N', 'Sample p(rho)'] else: header_fields = ['Index', 'ID', 'Location', 'Sample r', 'N', 'Sample p(r)'] return header_fields