from typing import Dict import string import datetime import uuid import requests import json as json from collections import OrderedDict import numpy as np import scipy.stats as ss from gn2.wqflask.database import database_connection from gn2.base import webqtlConfig from gn2.wqflask.show_trait.SampleList import SampleList from gn2.base.trait import create_trait from gn2.base import data_set from gn2.utility import helper_functions from gn2.utility.tools import get_setting, locate_ignore_error from gn2.utility.tools import GN_PROXY_URL from gn2.utility.redis_tools import get_redis_conn, get_resource_id from gn3.authentication import get_highest_user_access_role Redis = get_redis_conn() ONE_YEAR = 60 * 60 * 24 * 365 ############################################### # # Todo: Put in security to ensure that user has permission to access # confidential data sets And add i.p.limiting as necessary # ############################################## class ShowTrait: def __init__(self, db_cursor, user_id, kw): if 'trait_id' in kw and kw['dataset'] != "Temp": self.temp_trait = False self.trait_id = kw['trait_id'] helper_functions.get_species_dataset_trait(self, kw) self.resource_id = get_resource_id(self.dataset, self.trait_id) elif 'group' in kw: self.temp_trait = True self.trait_id = "Temp_" + kw['species'] + "_" + kw['group'] + \ "_" + datetime.datetime.now().strftime("%m%d%H%M%S") self.temp_species = kw['species'] self.temp_group = kw['group'] self.dataset = data_set.create_dataset( dataset_name="Temp", dataset_type="Temp", group_name=self.temp_group) # Put values in Redis so they can be looked up later if # added to a collection Redis.set(self.trait_id, kw['trait_paste'], ex=ONE_YEAR) self.trait_vals = kw['trait_paste'].split() self.this_trait = create_trait(dataset=self.dataset, name=self.trait_id, cellid=None) else: self.temp_trait = True self.trait_id = kw['trait_id'] self.temp_species = self.trait_id.split("_")[1] self.temp_group = self.trait_id.split("_")[2] self.dataset = data_set.create_dataset( dataset_name="Temp", dataset_type="Temp", group_name=self.temp_group) self.this_trait = create_trait(dataset=self.dataset, name=self.trait_id, cellid=None) self.trait_vals = Redis.get(self.trait_id).split() # Get verify/rna-seq link URLs try: blatsequence = self.this_trait.blatseq if self.dataset.type == "ProbeSet" else self.this_trait.sequence if not blatsequence: # XZ, 06/03/2009: ProbeSet name is not unique among platforms. We should use ProbeSet Id instead. seqs = () if self.dataset.type == "ProbeSet": db_cursor.execute( "SELECT Probe.Sequence, Probe.Name " "FROM Probe, ProbeSet, ProbeSetFreeze, " "ProbeSetXRef WHERE " "ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id " "AND ProbeSetXRef.ProbeSetId = ProbeSet.Id AND " "ProbeSetFreeze.Name = %s AND " "ProbeSet.Name = %s AND " "Probe.ProbeSetId = ProbeSet.Id ORDER " "BY Probe.SerialOrder", (self.this_trait.dataset.name, self.this_trait.name,) ) else: db_cursor.execute( "SELECT Geno.Sequence " "FROM Geno, GenoXRef, GenoFreeze " "WHERE Geno.Name = %s AND " "Geno.Id = GenoXRef.GenoId AND " "GenoXRef.GenoFreezeId = GenoFreeze.Id AND " "GenoFreeze.Name = %s", (self.this_trait.name, self.this_trait.dataset.name) ) seqs = db_cursor.fetchall() if not seqs: raise ValueError else: blatsequence = '' for seqt in seqs: if int(seqt[1][-1]) % 2 == 1: blatsequence += string.strip(seqt[0]) # --------Hongqiang add this part in order to not only blat ProbeSet, but also blat Probe blatsequence = '%3E' + self.this_trait.name + '%0A' + blatsequence + '%0A' # XZ, 06/03/2009: ProbeSet name is not unique among platforms. We should use ProbeSet Id instead. seqs = () db_cursor.execute( "SELECT Probe.Sequence, Probe.Name " "FROM Probe, ProbeSet, ProbeSetFreeze, " "ProbeSetXRef WHERE " "ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id " "AND ProbeSetXRef.ProbeSetId = ProbeSet.Id AND " "ProbeSetFreeze.Name = %s AND ProbeSet.Name = %s " "AND Probe.ProbeSetId = ProbeSet.Id " "ORDER BY Probe.SerialOrder", (self.this_trait.dataset.name, self.this_trait.name,) ) seqs = db_cursor.fetchall() for seqt in seqs: if int(seqt[1][-1]) % 2 == 1: blatsequence += '%3EProbe_' + \ seqt[1].strip() + '%0A' + seqt[0].strip() + '%0A' if self.dataset.group.species == "rat": self.UCSC_BLAT_URL = webqtlConfig.UCSC_BLAT % ( 'rat', 'rn7', blatsequence) self.UTHSC_BLAT_URL = "" elif self.dataset.group.species == "mouse": self.UCSC_BLAT_URL = webqtlConfig.UCSC_BLAT % ( 'mouse', 'mm10', blatsequence) self.UTHSC_BLAT_URL = webqtlConfig.UTHSC_BLAT % ( 'mouse', 'mm10', blatsequence) elif self.dataset.group.species == "human": self.UCSC_BLAT_URL = webqtlConfig.UCSC_BLAT % ( 'human', 'hg38', blatsequence) self.UTHSC_BLAT_URL = "" else: self.UCSC_BLAT_URL = "" self.UTHSC_BLAT_URL = "" except: self.UCSC_BLAT_URL = "" self.UTHSC_BLAT_URL = "" if self.dataset.type == "ProbeSet": self.show_probes = "True" trait_units = get_trait_units(self.this_trait) self.get_external_links() self.build_correlation_tools() self.ncbi_summary = get_ncbi_summary(self.this_trait) # Get nearest marker for composite mapping if not self.temp_trait: if check_if_attr_exists(self.this_trait, 'locus_chr') and self.dataset.type != "Geno" and self.dataset.type != "Publish": self.nearest_marker = get_nearest_marker( self.this_trait, self.dataset) else: self.nearest_marker = "" self.make_sample_lists() trait_vals_by_group = [] for sample_type in self.sample_groups: trait_vals_by_group.append(get_trait_vals(sample_type.sample_list)) self.max_digits_by_group, self.no_decimal_place = get_max_digits(trait_vals_by_group) self.qnorm_vals = quantile_normalize_vals(self.sample_groups, trait_vals_by_group) self.z_scores = get_z_scores(self.sample_groups, trait_vals_by_group) self.temp_uuid = uuid.uuid4() self.sample_group_types = OrderedDict() if len(self.sample_groups) > 1: self.sample_group_types['samples_primary'] = self.dataset.group.name self.sample_group_types['samples_other'] = "Other" self.sample_group_types['samples_all'] = "All" else: self.sample_group_types['samples_primary'] = self.dataset.group.name sample_lists = [group.sample_list for group in self.sample_groups] self.categorical_var_list = [] self.numerical_var_list = [] if not self.temp_trait: # ZS: Only using first samplelist, since I think mapping only uses those samples self.categorical_var_list = get_categorical_variables( self.this_trait, self.sample_groups[0]) self.numerical_var_list = get_numerical_variables( self.this_trait, self.sample_groups[0]) # ZS: Get list of chromosomes to select for mapping self.chr_list = [["All", -1]] for i, this_chr in enumerate(self.dataset.species.chromosomes.chromosomes(db_cursor)): self.chr_list.append( [self.dataset.species.chromosomes.chromosomes(db_cursor)[this_chr].name, i]) self.genofiles = self.dataset.group.get_genofiles() study_samplelist_json = self.dataset.group.get_study_samplelists() self.study_samplelists = [study["title"] for study in study_samplelist_json] # ZS: No need to grab scales from .geno file unless it's using # a mapping method that reads .geno files if "QTLReaper" or "R/qtl" in dataset.group.mapping_names: if self.genofiles: self.scales_in_geno = get_genotype_scales(self.genofiles) else: self.scales_in_geno = get_genotype_scales( self.dataset.group.name + ".geno") else: self.scales_in_geno = {} self.has_num_cases = has_num_cases(self.this_trait) # ZS: Needed to know whether to display bar chart + get max # sample name length in order to set table column width self.num_values = 0 # ZS: So it knows whether to display the Binary R/qtl mapping # method, which doesn't work unless all values are 0 or 1 self.binary = "true" # ZS: Since we don't want to show log2 transform option for # situations where it doesn't make sense self.negative_vals_exist = "false" max_samplename_width = 1 for group in self.sample_groups: for sample in group.sample_list: if len(sample.name) > max_samplename_width: max_samplename_width = len(sample.name) if sample.display_value != "x": self.num_values += 1 if sample.display_value != 0 or sample.display_value != 1: self.binary = "false" if sample.value < 0: self.negative_vals_exist = "true" # ZS: Check whether any attributes have few enough distinct # values to show the "Block samples by group" option self.categorical_attr_exists = "false" for attribute in self.sample_groups[0].attributes: if len(self.sample_groups[0].attributes[attribute].distinct_values) <= 10: self.categorical_attr_exists = "true" break sample_column_width = max_samplename_width * 8 self.stats_table_width, self.trait_table_width = get_table_widths( self.sample_groups, sample_column_width, self.has_num_cases) if self.num_values >= 5000: self.maf = 0.01 else: self.maf = 0.05 trait_symbol = None short_description = None if not self.temp_trait: if self.this_trait.symbol: trait_symbol = self.this_trait.symbol short_description = trait_symbol elif hasattr(self.this_trait, 'post_publication_abbreviation'): short_description = self.this_trait.post_publication_abbreviation elif hasattr(self.this_trait, 'pre_publication_abbreviation'): short_description = self.this_trait.pre_publication_abbreviation # Todo: Add back in the ones we actually need from below, as we discover we need them hddn = OrderedDict() if self.dataset.group.allsamples: hddn['allsamples'] = ','.join(self.dataset.group.allsamples) hddn['primary_samples'] = ','.join(self.primary_sample_names) hddn['trait_id'] = self.trait_id hddn['trait_display_name'] = self.this_trait.display_name hddn['dataset'] = self.dataset.name hddn['temp_trait'] = False if self.temp_trait: hddn['temp_trait'] = True hddn['group'] = self.temp_group hddn['species'] = self.temp_species else: hddn['group'] = self.dataset.group.name hddn['species'] = self.dataset.group.species hddn['use_outliers'] = False hddn['method'] = "gemma" hddn['mapmethod_rqtl'] = "hk" hddn['mapmodel_rqtl'] = "normal" hddn['pair_scan'] = "" hddn['selected_chr'] = -1 hddn['mapping_display_all'] = True hddn['suggestive'] = 0 hddn['study_samplelists'] = json.dumps(study_samplelist_json) hddn['num_perm'] = 0 hddn['categorical_vars'] = "" if self.categorical_var_list: hddn['categorical_vars'] = ",".join(self.categorical_var_list) hddn['manhattan_plot'] = "" hddn['control_marker'] = "" if not self.temp_trait: if hasattr(self.this_trait, 'locus_chr') and self.this_trait.locus_chr != "" and self.dataset.type != "Geno" and self.dataset.type != "Publish": hddn['control_marker'] = self.nearest_marker hddn['do_control'] = False hddn['maf'] = 0.05 hddn['mapping_scale'] = "physic" hddn['compare_traits'] = [] hddn['export_data'] = "" hddn['export_format'] = "excel" if len(self.scales_in_geno) < 2 and bool(self.scales_in_geno): hddn['mapping_scale'] = self.scales_in_geno[list( self.scales_in_geno.keys())[0]][0][0] # We'll need access to this_trait and hddn in the Jinja2 # Template, so we put it inside self self.hddn = hddn js_data = dict(trait_id=self.trait_id, trait_symbol=trait_symbol, max_digits = self.max_digits_by_group, no_decimal_place = self.no_decimal_place, short_description=short_description, unit_type=trait_units, dataset_type=self.dataset.type, species=self.dataset.group.species, scales_in_geno=self.scales_in_geno, data_scale=self.dataset.data_scale, sample_group_types=self.sample_group_types, sample_lists=sample_lists, se_exists=self.sample_groups[0].se_exists, has_num_cases=self.has_num_cases, attributes=self.sample_groups[0].attributes, categorical_attr_exists=self.categorical_attr_exists, categorical_vars=",".join(self.categorical_var_list), num_values=self.num_values, qnorm_values=self.qnorm_vals, zscore_values=self.z_scores, sample_column_width=sample_column_width, temp_uuid=self.temp_uuid) self.js_data = js_data def get_external_links(self): # ZS: There's some weirdness here because some fields don't # exist while others are empty strings self.pubmed_link = webqtlConfig.PUBMEDLINK_URL % self.this_trait.pubmed_id if check_if_attr_exists( self.this_trait, 'pubmed_id') else None self.ncbi_gene_link = webqtlConfig.NCBI_LOCUSID % self.this_trait.geneid if check_if_attr_exists( self.this_trait, 'geneid') else None self.omim_link = webqtlConfig.OMIM_ID % self.this_trait.omim if check_if_attr_exists( self.this_trait, 'omim') else None self.homologene_link = webqtlConfig.HOMOLOGENE_ID % self.this_trait.homologeneid if check_if_attr_exists( self.this_trait, 'homologeneid') else None self.genbank_link = None if check_if_attr_exists(self.this_trait, 'genbankid'): genbank_id = '|'.join(self.this_trait.genbankid.split('|')[0:10]) if genbank_id[-1] == '|': genbank_id = genbank_id[0:-1] self.genbank_link = webqtlConfig.GENBANK_ID % genbank_id self.uniprot_link = None if check_if_attr_exists(self.this_trait, 'uniprotid'): self.uniprot_link = webqtlConfig.UNIPROT_URL % self.this_trait.uniprotid self.genotation_link = self.rgd_link = self.phenogen_link = self.gtex_link = self.genebridge_link = self.ucsc_blat_link = self.biogps_link = self.protein_atlas_link = None self.string_link = self.panther_link = self.aba_link = self.ebi_gwas_link = self.wiki_pi_link = self.genemania_link = self.ensembl_link = None if self.this_trait.symbol: self.genotation_link = webqtlConfig.GENOTATION_URL % self.this_trait.symbol self.gtex_link = webqtlConfig.GTEX_URL % self.this_trait.symbol self.string_link = webqtlConfig.STRING_URL % self.this_trait.symbol self.panther_link = webqtlConfig.PANTHER_URL % self.this_trait.symbol self.ebi_gwas_link = webqtlConfig.EBIGWAS_URL % self.this_trait.symbol self.protein_atlas_link = webqtlConfig.PROTEIN_ATLAS_URL % self.this_trait.symbol if self.dataset.group.species == "mouse" or self.dataset.group.species == "human": self.rgd_link = webqtlConfig.RGD_URL % ( self.this_trait.symbol, self.dataset.group.species.capitalize()) if self.dataset.group.species == "mouse": self.genemania_link = webqtlConfig.GENEMANIA_URL % ( "mus-musculus", self.this_trait.symbol) else: self.genemania_link = webqtlConfig.GENEMANIA_URL % ( "homo-sapiens", self.this_trait.symbol) if self.dataset.group.species == "mouse": self.aba_link = webqtlConfig.ABA_URL % self.this_trait.symbol results = () with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: cursor.execute( "SELECT chromosome, txStart, txEnd FROM " "GeneList WHERE geneSymbol = %s", (self.this_trait.symbol,) ) results = cursor.fetchone() if results: chr, transcript_start, transcript_end = results else: chr = transcript_start = transcript_end = None if chr and transcript_start and transcript_end and self.this_trait.refseq_transcriptid: transcript_start = int(transcript_start * 1000000) transcript_end = int(transcript_end * 1000000) self.ucsc_blat_link = webqtlConfig.UCSC_REFSEQ % ( 'mm10', self.this_trait.refseq_transcriptid, chr, transcript_start, transcript_end) if self.dataset.group.species == "rat": self.rgd_link = webqtlConfig.RGD_URL % ( self.this_trait.symbol, self.dataset.group.species.capitalize()) self.phenogen_link = webqtlConfig.PHENOGEN_URL % ( self.this_trait.symbol) self.genemania_link = webqtlConfig.GENEMANIA_URL % ( "rattus-norvegicus", self.this_trait.symbol) results = () with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: cursor.execute( "SELECT kgID, chromosome, txStart, txEnd " "FROM GeneList_rn33 WHERE geneSymbol = %s", (self.this_trait.symbol,) ) if results := cursor.fetchone(): kgId, chr, transcript_start, transcript_end = results else: kgId = chr = transcript_start = transcript_end = None if chr and transcript_start and transcript_end and kgId: # Convert to bases from megabases transcript_start = int(transcript_start * 1000000) transcript_end = int(transcript_end * 1000000) self.ucsc_blat_link = webqtlConfig.UCSC_REFSEQ % ( 'rn7', kgId, chr, transcript_start, transcript_end) if self.this_trait.geneid and (self.dataset.group.species == "mouse" or self.dataset.group.species == "rat" or self.dataset.group.species == "human"): self.biogps_link = webqtlConfig.BIOGPS_URL % ( self.dataset.group.species, self.this_trait.geneid) self.gemma_link = webqtlConfig.GEMMA_URL % self.this_trait.geneid if self.dataset.group.species == "human": self.aba_link = webqtlConfig.ABA_URL % self.this_trait.geneid def build_correlation_tools(self): if self.temp_trait == True: this_group = self.temp_group else: this_group = self.dataset.group.name # We're checking a string here! assert isinstance(this_group, str), "We need a string type thing here" if this_group[:3] == 'BXD' and this_group not in webqtlConfig.BXD_GROUP_EXCEPTIONS: this_group = 'BXD' if this_group: if self.temp_trait == True: dataset_menu = data_set.datasets(this_group) else: dataset_menu = data_set.datasets( this_group, self.dataset.group) dataset_menu_selected = None if len(dataset_menu): if self.dataset: dataset_menu_selected = self.dataset.name return_results_menu = (100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000) return_results_menu_selected = 500 self.corr_tools = dict(dataset_menu=dataset_menu, dataset_menu_selected=dataset_menu_selected, return_results_menu=return_results_menu, return_results_menu_selected=return_results_menu_selected,) def make_sample_lists(self): all_samples_ordered = self.dataset.group.all_samples_ordered() parent_f1_samples = [] if self.dataset.group.parlist and self.dataset.group.f1list: parent_f1_samples = self.dataset.group.parlist + self.dataset.group.f1list primary_sample_names = list(all_samples_ordered) if not self.temp_trait: other_sample_names = [] for sample in list(self.this_trait.data.keys()): if (self.this_trait.data[sample].name2 != self.this_trait.data[sample].name): if ((self.this_trait.data[sample].name2 in primary_sample_names) and (self.this_trait.data[sample].name not in primary_sample_names)): primary_sample_names.append( self.this_trait.data[sample].name) primary_sample_names.remove( self.this_trait.data[sample].name2) all_samples_set = set(all_samples_ordered) if sample not in all_samples_set: all_samples_ordered.append(sample) other_sample_names.append(sample) # CFW is here because the .geno file doesn't properly # contain its full list of samples. This should probably # be fixed. if self.dataset.group.species == "human" or (set(primary_sample_names) == set(parent_f1_samples)) or self.dataset.group.name == "CFW": primary_sample_names += other_sample_names other_sample_names = [] if other_sample_names: primary_header = "%s Only" % (self.dataset.group.name) else: primary_header = "Samples" primary_samples = SampleList(dataset=self.dataset, sample_names=primary_sample_names, this_trait=self.this_trait, sample_group_type='primary', header=primary_header) # if other_sample_names and self.dataset.group.species != # "human" and self.dataset.group.name != "CFW": if len(other_sample_names) > 0: other_sample_names.sort() # Sort other samples other_samples = SampleList(dataset=self.dataset, sample_names=other_sample_names, this_trait=self.this_trait, sample_group_type='other', header="Other") self.sample_groups = (primary_samples, other_samples) else: self.sample_groups = (primary_samples,) else: primary_samples = SampleList(dataset=self.dataset, sample_names=primary_sample_names, this_trait=self.trait_vals, sample_group_type='primary', header="%s Only" % (self.dataset.group.name)) self.sample_groups = (primary_samples,) self.primary_sample_names = primary_sample_names self.dataset.group.allsamples = all_samples_ordered def get_trait_vals(sample_list): trait_vals = [] for sample in sample_list: try: trait_vals.append(float(sample.value)) except: continue return trait_vals def get_max_digits(trait_vals): def __max_digits__(these_vals): if not bool(these_vals): return None return len(str(max(these_vals))) - 1 return [__max_digits__(val) for val in trait_vals], all([val.is_integer() for val in sum(trait_vals, [])]) def normf(trait_vals): ranked_vals = ss.rankdata(trait_vals) p_list = [] for i, val in enumerate(trait_vals): p_list.append(((i + 1) - 0.5) / len(trait_vals)) z = ss.norm.ppf(p_list) normed_vals = [] for rank in ranked_vals: normed_vals.append("%0.3f" % z[int(rank) - 1]) return normed_vals def quantile_normalize_vals(sample_groups, trait_vals): qnorm_by_group = [] for i, sample_type in enumerate(sample_groups): qnorm_vals = normf(trait_vals[i]) qnorm_vals_with_x = [] counter = 0 for sample in sample_type.sample_list: if sample.display_value == "x": qnorm_vals_with_x.append("x") else: qnorm_vals_with_x.append(qnorm_vals[counter]) counter += 1 qnorm_by_group.append(qnorm_vals_with_x) return qnorm_by_group def get_z_scores(sample_groups, trait_vals): zscore_by_group = [] for i, sample_type in enumerate(sample_groups): zscores = ss.mstats.zscore(np.array(trait_vals[i])).tolist() zscores_with_x = [] counter = 0 for sample in sample_type.sample_list: if sample.display_value == "x": zscores_with_x.append("x") else: zscores_with_x.append("%0.3f" % zscores[counter]) counter += 1 zscore_by_group.append(zscores_with_x) return zscore_by_group def get_nearest_marker(this_trait, this_db): this_chr = this_trait.locus_chr this_mb = this_trait.locus_mb # One option is to take flanking markers, another is to take the # two (or one) closest with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: cursor.execute( "SELECT Geno.Name FROM Geno, GenoXRef, " "GenoFreeze WHERE Geno.Chr = %s AND " "GenoXRef.GenoId = Geno.Id AND " "GenoFreeze.Id = GenoXRef.GenoFreezeId " "AND GenoFreeze.Name = %s ORDER BY " "ABS( Geno.Mb - %s) LIMIT 1", (this_chr, f"{this_db.group.name}Geno", this_mb,) ) if result := cursor.fetchall(): return result[0][0] return "" def get_table_widths(sample_groups, sample_column_width, has_num_cases=False): stats_table_width = 250 if len(sample_groups) > 1: stats_table_width = 450 trait_table_width = 300 + sample_column_width if sample_groups[0].se_exists: trait_table_width += 80 if has_num_cases: trait_table_width += 80 trait_table_width += len(sample_groups[0].attributes) * 88 return stats_table_width, trait_table_width def has_num_cases(this_trait): has_n = False if this_trait.dataset.type != "ProbeSet" and this_trait.dataset.type != "Geno": for name, sample in list(this_trait.data.items()): if sample.num_cases and sample.num_cases != "1": has_n = True break return has_n def get_trait_units(this_trait): unit_type = "" inside_brackets = False if this_trait.description_fmt: if ("[" in this_trait.description_fmt) and ("]" in this_trait.description_fmt): for i in this_trait.description_fmt: if inside_brackets: if i != "]": unit_type += i else: inside_brackets = False if i == "[": inside_brackets = True if unit_type == "": unit_type = "value" return unit_type def check_if_attr_exists(the_trait, id_type): if hasattr(the_trait, id_type): if getattr(the_trait, id_type) == None or getattr(the_trait, id_type) == "": return False else: return True else: return False def get_ncbi_summary(this_trait): if check_if_attr_exists(this_trait, 'geneid'): # ZS: Need to switch this try/except to something that checks # the output later try: response = requests.get( "http://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=gene&id=%s&retmode=json" % this_trait.geneid) summary = json.loads(response.content)[ 'result'][this_trait.geneid]['summary'] return summary except: return None else: return None def get_categorical_variables(this_trait, sample_list) -> list: categorical_var_list = [] if len(sample_list.attributes) > 0: for attribute in sample_list.attributes: if len(sample_list.attributes[attribute].distinct_values) < 10: categorical_var_list.append(str(sample_list.attributes[attribute].id)) return categorical_var_list def get_numerical_variables(this_trait, sample_list) -> list: numerical_var_list = [] if len(sample_list.attributes) > 0: for attribute in sample_list.attributes: all_numeric = True all_none = True for attr_val in sample_list.attributes[attribute].distinct_values: if not attr_val: continue try: val_as_float = float(attr_val) all_none = False except: all_numeric = False break if all_numeric and not all_none: numerical_var_list.append(sample_list.attributes[attribute].name) return numerical_var_list def get_genotype_scales(genofiles): geno_scales = {} if isinstance(genofiles, list): for the_file in genofiles: file_location = the_file['location'] geno_scales[file_location] = get_scales_from_genofile( file_location) else: geno_scales[genofiles] = get_scales_from_genofile(genofiles) return geno_scales def get_scales_from_genofile(file_location): geno_path = locate_ignore_error(file_location, 'genotype') # ZS: This is just to allow the code to run when if not geno_path: return [["physic", "Mb"]] cm_and_mb_cols_exist = True cm_column = None mb_column = None with open(geno_path, "r") as geno_fh: for i, line in enumerate(geno_fh): if line[0] == "#" or line[0] == "@": # ZS: If the scale is made explicit in the metadata, # use that if "@scale" in line: scale = line.split(":")[1].strip() if scale == "morgan": return [["morgan", "cM"]] else: return [["physic", "Mb"]] else: continue if line[:3] == "Chr": first_marker_line = i + 1 if line.split("\t")[2].strip() == "cM": cm_column = 2 elif line.split("\t")[3].strip() == "cM": cm_column = 3 if line.split("\t")[2].strip() == "Mb": mb_column = 2 elif line.split("\t")[3].strip() == "Mb": mb_column = 3 break # ZS: This attempts to check whether the cM and Mb columns are # 'real', since some .geno files have one column be a copy of # the other column, or have one column that is all 0s cm_all_zero = True mb_all_zero = True cm_mb_all_equal = True for i, line in enumerate(geno_fh): # ZS: I'm assuming there won't be more than 10 markers # where the position is listed as 0 if first_marker_line <= i < first_marker_line + 10: if cm_column: cm_val = line.split("\t")[cm_column].strip() if cm_val != "0": cm_all_zero = False if mb_column: mb_val = line.split("\t")[mb_column].strip() if mb_val != "0": mb_all_zero = False if cm_column and mb_column: if cm_val != mb_val: cm_mb_all_equal = False else: if i > first_marker_line + 10: break # ZS: This assumes that both won't be all zero, since if that's # the case mapping shouldn't be an option to begin with if mb_all_zero: return [["morgan", "cM"]] elif cm_mb_all_equal: return [["physic", "Mb"]] elif cm_and_mb_cols_exist: return [["physic", "Mb"], ["morgan", "cM"]] else: return [["physic", "Mb"]] def get_diff_of_vals(new_vals: Dict, trait_id: str) -> Dict: """ Get the diff between current sample values and the values in the DB Given a dict of the changed values and the trait/dataset ID, return a Dict with keys corresponding to each sample with a changed value and a value that is a dict with keys for the old_value and new_value """ trait_name = trait_id.split(":")[0] dataset_name = trait_id.split(":")[1] trait_ob = create_trait(name=trait_name, dataset_name=dataset_name) old_vals = {sample : trait_ob.data[sample].value for sample in trait_ob.data} shared_samples = set.union(set(new_vals.keys()), set(old_vals.keys())) diff_dict = {} for sample in shared_samples: try: new_val = round(float(new_vals[sample]), 3) except: new_val = "x" try: old_val = round(float(old_vals[sample]), 3) except: old_val = "x" if new_val != old_val: diff_dict[sample] = { "new_val": new_val, "old_val": old_val } return diff_dict