import string from PIL import (Image) from gn2.base import species from gn2.base import webqtlConfig from gn2.wqflask.database import database_connection from gn2.utility.tools import get_setting class SnpBrowser: def __init__(self, db_cursor, start_vars): self.strain_lists = get_browser_sample_lists() self.initialize_parameters(db_cursor, start_vars) if self.first_run == "false": self.filtered_results = self.get_browser_results() self.table_rows = self.get_table_rows() self.rows_count = len(self.table_rows) del self.filtered_results if 'sEcho' not in start_vars: self.table_rows = [] if self.limit_strains == "true": self.header_fields, self.empty_field_count, self.header_data_names = get_header_list( variant_type=self.variant_type, strains=self.chosen_strains, empty_columns=self.empty_columns) else: self.header_fields, self.empty_field_count, self.header_data_names = get_header_list( variant_type=self.variant_type, strains=self.strain_lists, species=self.species_name, empty_columns=self.empty_columns) def initialize_parameters(self, db_cursor, start_vars): if 'first_run' in start_vars: self.first_run = "false" else: self.first_run = "true" self.allele_list = [] self.variant_type = "SNP" if 'variant' in start_vars: self.variant_type = start_vars['variant'] self.species_name = "Mouse" self.species_id = 1 if 'species' in start_vars: self.species_name = start_vars['species'] if self.species_name.capitalize() == "Rat": self.species_id = 2 self.mouse_chr_list = [] self.rat_chr_list = [] mouse_species_ob = species.TheSpecies(species_name="Mouse") for key in mouse_species_ob.chromosomes.chromosomes(db_cursor): self.mouse_chr_list.append( mouse_species_ob.chromosomes.chromosomes(db_cursor)[key].name) rat_species_ob = species.TheSpecies(species_name="Rat") for key in rat_species_ob.chromosomes.chromosomes(db_cursor): self.rat_chr_list.append( rat_species_ob.chromosomes.chromosomes(db_cursor)[key].name) if self.species_id == 1: self.this_chr_list = self.mouse_chr_list else: self.this_chr_list = self.rat_chr_list if self.first_run == "true": self.chr = "19" self.start_mb = 30.1 self.end_mb = 30.12 else: if 'gene_name' in start_vars: if start_vars['gene_name'] != "": self.gene_name = start_vars['gene_name'] else: self.gene_name = "" self.chr = start_vars['chr'] try: self.start_mb = float(start_vars['start_mb']) self.end_mb = float(start_vars['end_mb']) except: self.start_mb = 0.0 self.end_mb = 0.0 else: try: self.chr = start_vars['chr'] self.start_mb = float(start_vars['start_mb']) self.end_mb = float(start_vars['end_mb']) except: self.chr = "1" self.start_mb = 0.0 self.end_mb = 0.0 self.limit_strains = "true" if self.first_run == "false": if 'limit_strains' not in start_vars: self.limit_strains = "false" else: if start_vars['limit_strains'] == "false": self.limit_strains = "false" self.chosen_strains_mouse = ["C57BL/6J", "DBA/2J", "A/J", "129S1/SvImJ", "NOD/ShiLtJ", "NZO/HlLtJ", "WSB/EiJ", "PWK/PhJ", "CAST/EiJ"] self.chosen_strains_rat = ["BN", "F344", "WLI", "WMI"] if 'chosen_strains_mouse' in start_vars: self.chosen_strains_mouse = start_vars['chosen_strains_mouse'].split( ",") if 'chosen_strains_rat' in start_vars: self.chosen_strains_rat = start_vars['chosen_strains_rat'].split( ",") if self.species_id == 1: self.chosen_strains = self.chosen_strains_mouse else: self.chosen_strains = self.chosen_strains_rat self.domain = "All" if 'domain' in start_vars: self.domain = start_vars['domain'] self.function = "All" if 'function' in start_vars: self.function = start_vars['function'] self.source = "All" if 'source' in start_vars: self.source = start_vars['source'] self.criteria = ">=" if 'criteria' in start_vars: self.criteria = start_vars['criteria'] self.score = 0.0 if 'score' in start_vars: self.score = start_vars['score'] self.redundant = "false" if self.first_run == "false" and 'redundant' in start_vars: self.redundant = "true" self.diff_alleles = "true" if self.first_run == "false": if 'diff_alleles' not in start_vars: self.diff_alleles = "false" else: if start_vars['diff_alleles'] == "false": self.diff_alleles = "false" def get_browser_results(self): self.snp_list = None __query = "" __vars = None with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: if self.gene_name != "": if self.species_id != 0: __query = ("SELECT geneSymbol, chromosome, txStart, " "txEnd FROM GeneList WHERE SpeciesId = %s " "AND geneSymbol = %s") __vars = (self.species_id, self.gene_name,) else: __query = ("SELECT geneSymbol, chromosome, txStart, " "txEnd FROM GeneList WHERE geneSymbol = %s") __vars = (self.gene_name,) cursor.execute(__query, __vars) if result := cursor.fetchone(): self.gene_name, self.chr, self.start_mb, self.end_mb = result else: if self.variant_type in ["SNP", "InDel"]: result_snp = None __vars = (self.gene_name,) if self.variant_type == "SNP": if self.gene_name[:2] == "rs": __query = ("SELECT Id, Chromosome, Position, " "Position+0.000001 FROM SnpAll " "WHERE Rs = %s") else: if self.species_id != 0: __query = ( "SELECT Id, Chromosome, Position, " "Position+0.000001 FROM SnpAll WHERE " "SpeciesId = %s AND SnpName = %s") __vars = (self.species_id, self.gene_name,) else: __query = ( "SELECT Id, Chromosome, Position, " "Position+0.000001 FROM SnpAll " "WHERE SnpName = %s") cursor.execute(__query, __vars) result_snp = cursor.fetchall() else: # variant_type == InDel if self.gene_name[0] == "I": if self.species_id != 0: __query = ( "SELECT Id, Chromosome, Mb_start, " "Mb_end FROM IndelAll WHERE " "SpeciesId = %s AND Name = %s") __vars = (self.species_id, self.gene_name,) else: __query = ( "SELECT Id, Chromosome, Mb_start, " "Mb_end FROM IndelAll WHERE Name = %s",) __vars = (self.gene_name,) cursor.execute(__query, __vars) result_snp = cursor.fetchall() if result_snp: self.snp_list = [item[0] for item in result_snp] self.chr = result_snp[0][1] self.start_mb = result_snp[0][2] self.end_mb = result_snp[0][3] else: return [] if self.variant_type == "SNP": __vars = (self.species_id, self.chr, f"{self.start_mb:.6f}", f"{self.end_mb:.6f}",) if self.species_id == 1: # Mouse __query = ("SELECT a.*, b.* FROM SnpAll a, SnpPattern b " "WHERE a.SpeciesId = %s AND a.Chromosome = %s " "AND a.Position >= %s AND a.Position < %s " "AND a.Id = b.SnpId ORDER BY a.Position") elif self.species_id == 2: # Rat __query = ( "SELECT a.*, b.* FROM SnpAll a, RatSnpPattern b " "WHERE a.SpeciesId = %s AND a.Chromosome = %s " "AND a.Position >= %s AND a.Position < %s " "AND a.Id = b.SnpId ORDER BY a.Position") elif self.variant_type == "InDel": if self.species_id != 0: __query = ( "SELECT DISTINCT a.Name, a.Chromosome, a.SourceId, " "a.Mb_start, a.Mb_end, a.Strand, a.Type, a.Size, " "a.InDelSequence, b.Name FROM IndelAll a, " "SnpSource b WHERE a.SpeciesId = %s AND " "a.Chromosome = %s AND a.Mb_start >= %s " "AND a.Mb_start < %s AND b.Id = a.SourceId " "ORDER BY a.Mb_start") __vars = (self.species_id, self.chr, f"{self.start_mb:2.6f}", f"{self.end_mb+0.0010:2.6f}",) cursor.execute(__query, __vars) else: __query = ( "SELECT DISTINCT a.Name, a.Chromosome, a.SourceId, " "a.Mb_start, a.Mb_end, a.Strand, a.Type, a.Size, " "a.InDelSequence, b.Name FROM IndelAll a, " "SnpSource b WHERE a.Chromosome = %s AND " "a.Mb_start >= %s AND a.Mb_start < %s " "AND b.Id = a.SourceId ORDER BY a.Mb_start") __vars = (self.chr, f"{self.start_mb+0.0010:2.6f}", f"{self.end_mb+0.0010:2.6f}",) cursor.execute(__query, __vars) return self.filter_results(cursor.fetchall()) def filter_results(self, results): filtered_results = [] strain_index_list = [] # ZS: List of positions of selected strains in strain list last_mb = -1 if self.limit_strains == "true" and len(self.chosen_strains) > 0: for item in self.chosen_strains: index = self.strain_lists[self.species_name.lower()].index( item) strain_index_list.append(index) for seq, result in enumerate(results): result = list(result) if self.variant_type == "SNP": display_strains = [] snp_id, species_id, snp_name, rs, chr, mb, mb_2016, alleles, snp_source, conservation_score = result[ :10] effect_list = result[10:28] if self.species_id == 1: self.allele_list = result[30:] elif self.species_id == 2: self.allele_list = result[31:] if self.limit_strains == "true" and len(self.chosen_strains) > 0: for index in strain_index_list: if self.species_id == 1: display_strains.append(result[29 + index]) elif self.species_id == 2: display_strains.append(result[31 + index]) self.allele_list = display_strains effect_info_dict = get_effect_info(effect_list) coding_domain_list = ['Start Gained', 'Start Lost', 'Stop Gained', 'Stop Lost', 'Nonsynonymous', 'Synonymous'] intron_domain_list = ['Splice Site', 'Nonsplice Site'] for key in effect_info_dict: if key in coding_domain_list: domain = ['Exon', 'Coding'] elif key in ['3\' UTR', '5\' UTR']: domain = ['Exon', key] elif key == "Unknown Effect In Exon": domain = ['Exon', ''] elif key in intron_domain_list: domain = ['Intron', key] else: domain = [key, ''] if 'Intergenic' in domain: if self.gene_name != "": gene_id = get_gene_id( self.species_id, self.gene_name) gene = [gene_id, self.gene_name] else: gene = check_if_in_gene(species_id, chr, mb) transcript = exon = function = function_details = '' if self.redundant == "false" or last_mb != mb: # filter redundant if self.include_record(domain, function, snp_source, conservation_score): info_list = [snp_name, rs, chr, mb, alleles, gene, transcript, exon, domain, function, function_details, snp_source, conservation_score, snp_id] info_list.extend(self.allele_list) filtered_results.append(info_list) last_mb = mb else: gene_list, transcript_list, exon_list, function_list, function_details_list = effect_info_dict[ key] for index, item in enumerate(gene_list): gene = item transcript = transcript_list[index] if exon_list: exon = exon_list[index] else: exon = "" if function_list: function = function_list[index] if function == "Unknown Effect In Exon": function = "Unknown" else: function = "" if function_details_list: function_details = "Biotype: " + \ function_details_list[index] else: function_details = "" if self.redundant == "false" or last_mb != mb: if self.include_record(domain, function, snp_source, conservation_score): info_list = [snp_name, rs, chr, mb, alleles, gene, transcript, exon, domain, function, function_details, snp_source, conservation_score, snp_id] info_list.extend(self.allele_list) filtered_results.append(info_list) last_mb = mb elif self.variant_type == "InDel": # The order of variables is important; this applies to anything from the variant table as indel indel_name, indel_chr, source_id, indel_mb_start, indel_mb_end, indel_strand, indel_type, indel_size, indel_sequence, source_name = result indel_type = indel_type.title() if self.redundant == "false" or last_mb != indel_mb_start: gene = "No Gene" domain = conservation_score = snp_id = snp_name = rs = flank_3 = flank_5 = ncbi = function = "" if self.include_record(domain, function, source_name, conservation_score): filtered_results.append([indel_name, indel_chr, indel_mb_start, indel_mb_end, indel_strand, indel_type, indel_size, indel_sequence, source_name]) last_mb = indel_mb_start else: filtered_results.append(result) return filtered_results def get_table_rows(self): """ Take results and put them into the order and format necessary for the tables rows """ if self.variant_type == "SNP": gene_name_list = [] for item in self.filtered_results: if item[5] and item[5] != "": gene_name = item[5][1] # eliminate duplicate gene_name if gene_name and (gene_name not in gene_name_list): gene_name_list.append(gene_name) if len(gene_name_list) > 0: gene_id_name_dict = get_gene_id_name_dict( self.species_id, gene_name_list) # ZS: list of booleans representing which columns are entirely empty, so they aren't displayed on the page; only including ones that are sometimes empty (since there's always a location, etc) self.empty_columns = { "snp_source": "false", "conservation_score": "false", "gene_name": "false", "transcript": "false", "exon": "false", "domain_2": "false", "function": "false", "function_details": "false" } the_rows = [] for i, result in enumerate(self.filtered_results): this_row = {} if self.variant_type == "SNP": snp_name, rs, chr, mb, alleles, gene, transcript, exon, domain, function, function_details, snp_source, conservation_score, snp_id = result[ :14] allele_value_list = result[14:] if rs: snp_url = webqtlConfig.DBSNP % (rs) snp_name = rs else: rs = "" start_bp = int(mb * 1000000 - 100) end_bp = int(mb * 1000000 + 100) position_info = "chr%s:%d-%d" % (chr, start_bp, end_bp) if self.species_id == 2: snp_url = webqtlConfig.GENOMEBROWSER_URL % ( "rn6", position_info) else: snp_url = webqtlConfig.GENOMEBROWSER_URL % ( "mm10", position_info) mb = float(mb) mb_formatted = "%2.6f" % mb if snp_source == "Sanger/UCLA": source_url_1 = "http://www.sanger.ac.uk/resources/mouse/genomes/" source_url_2 = "http://mouse.cs.ucla.edu/mousehapmap/beta/wellcome.html" source_urls = [source_url_1, source_url_2] self.empty_columns['snp_source'] = "true" else: source_urls = [] if not conservation_score: conservation_score = "" else: self.empty_columns['conservation_score'] = "true" if gene: gene_name = gene[1] # if gene_name has related gene_id, use gene_id for NCBI search if (gene_name in gene_id_name_dict) and (gene_id_name_dict[gene_name] != None and gene_id_name_dict[gene_name] != ""): gene_id = gene_id_name_dict[gene[1]] gene_link = webqtlConfig.NCBI_LOCUSID % gene_id else: gene_link = "http://www.ncbi.nlm.nih.gov/entrez/query.fcgi?CMD=search&DB=gene&term=%s" % gene_name self.empty_columns['gene_name'] = "true" else: gene_name = "" gene_link = "" if transcript: transcript_link = webqtlConfig.ENSEMBLETRANSCRIPT_URL % ( transcript) self.empty_columns['transcript'] = "true" else: transcript_link = "" if exon: exon = exon[1] # exon[0] is exon_id, exon[1] is exon_rank self.empty_columns['exon'] = "true" else: exon = "" if domain: domain_1 = domain[0] domain_2 = domain[1] if domain_1 == "Intergenic" and gene != "": domain_1 = gene_name else: if domain_1 == "Exon": domain_1 = domain_1 + " " + exon if domain_2 != "": self.empty_columns['domain_2'] = "true" if function: self.empty_columns['function'] = "true" function_list = [] if function_details: function_list = function_details.strip().split(",") function_list = [item.strip() for item in function_list] function_list[0] = function_list[0].title() function_details = ", ".join( item for item in function_list) function_details = function_details.replace("_", " ") function_details = function_details.replace("/", " -> ") if function_details == "Biotype: Protein Coding": function_details = function_details + ", Coding Region Unknown" self.empty_columns['function_details'] = "true" #[snp_href, chr, mb_formatted, alleles, snp_source_cell, conservation_score, gene_name_cell, transcript_href, exon, domain_1, domain_2, function, function_details] base_color_dict = {"A": "#C33232", "C": "#1569C7", "T": "#CFCF32", "G": "#32C332", "t": "#FF6", "c": "#5CB3FF", "a": "#F66", "g": "#CF9", ":": "#FFFFFF", "-": "#FFFFFF", "?": "#FFFFFF"} the_bases = [] for j, item in enumerate(allele_value_list): if item and isinstance(item, str): this_base = [str(item), base_color_dict[item]] else: this_base = "" the_bases.append(this_base) this_row = { "index": i + 1, "rs": str(rs), "snp_url": str(snp_url), "snp_name": str(snp_name), "chr": str(chr), "mb_formatted": mb_formatted, "alleles": str(alleles), "snp_source": str(snp_source), "source_urls": source_urls, "conservation_score": str(conservation_score), "gene_name": str(gene_name), "gene_link": str(gene_link), "transcript": str(transcript), "transcript_link": str(transcript_link), "exon": str(exon), "domain_1": str(domain_1), "domain_2": str(domain_2), "function": str(function), "function_details": str(function_details), "allele_value_list": the_bases } elif self.variant_type == "InDel": indel_name, indel_chr, indel_mb_s, indel_mb_e, indel_strand, indel_type, indel_size, indel_sequence, source_name = result this_row = { "index": i, "indel_name": str(indel_name), "indel_chr": str(indel_chr), "indel_mb_s": str(indel_mb_s), "indel_mb_e": str(indel_mb_e), "indel_strand": str(indel_strand), "indel_type": str(indel_type), "indel_size": str(indel_size), "indel_sequence": str(indel_sequence), "source_name": str(source_name) } #this_row = [indel_name, indel_chr, indel_mb_s, indel_mb_e, indel_strand, indel_type, indel_size, indel_sequence, source_name] the_rows.append(this_row) return the_rows def include_record(self, domain, function, snp_source, conservation_score): """ Decide whether to add this record """ domain_satisfied = True function_satisfied = True different_alleles_satisfied = True source_satisfied = True if domain: if len(domain) == 0: if self.domain != "All": domain_satisfied = False else: domain_satisfied = False if domain[0].startswith(self.domain) or domain[1].startswith(self.domain) or self.domain == "All": domain_satisfied = True else: if self.domain != "All": domain_satisfied = False if snp_source: if len(snp_source) == 0: if self.source != "All": source_satisfied = False else: source_satisfied = False if snp_source.startswith(self.source) or self.source == "All": source_satisfied = True else: if self.source != "All": source_satisfied = False if function: if len(function) == 0: if self.function != "All": function_satisfied = False else: function_satisfied = False if self.function != "All": if function.startswith(self.function): function_satisfied = True else: function_satisfied = True else: if self.function != "All": function_satisfied = False if conservation_score: score_as_float = float(conservation_score) try: input_score_float = float(self.score) # the user-input score except: input_score_float = 0.0 if self.criteria == ">=": if score_as_float >= input_score_float: score_satisfied = True else: score_satisfied = False elif self.criteria == "==": if score_as_float == input_score_float: score_satisfied = True else: score_satisfied = False elif self.criteria == "<=": if score_as_float <= input_score_float: score_satisfied = True else: score_satisfied = False else: try: if float(self.score) > 0: score_satisfied = False else: score_satisfied = True except: score_satisfied = True if self.variant_type == "SNP" and self.diff_alleles == "true": this_allele_list = [] for item in self.allele_list: if item and isinstance(item, str) and (item.lower() not in this_allele_list) and (item != "-"): this_allele_list.append(item.lower()) total_allele_count = len(this_allele_list) if total_allele_count <= 1: different_alleles_satisfied = False else: different_alleles_satisfied = True else: different_alleles_satisfied = True return domain_satisfied and function_satisfied and source_satisfied and score_satisfied and different_alleles_satisfied def snp_density_map(self, query, results): canvas_width = 900 canvas_height = 200 snp_canvas = Image.new("RGBA", size=(canvas_width, canvas_height)) left_offset, right_offset, top_offset, bottom_offset = (30, 30, 40, 50) plot_width = canvas_width - left_offset - right_offset plot_height = canvas_height - top_offset - bottom_offset y_zero = top_offset + plot_height / 2 x_scale = plot_width / (self.end_mb - self.start_mb) # draw clickable image map at some point n_click = 80.0 click_step = plot_width / n_click click_mb_step = (self.end_mb - self.start_mb) / n_click def get_browser_sample_lists(species_id=1): strain_lists = {} mouse_strain_list = [] rat_strain_list = [] with database_connection(get_setting("SQL_URI")) as conn: with conn.cursor() as cursor: cursor.execute("SHOW COLUMNS FROM SnpPattern") _mouse_snp_pattern = cursor.fetchall() cursor.execute("SHOW COLUMNS FROM RatSnpPattern") _rats_snp_pattern = cursor.fetchall() for result in _mouse_snp_pattern[1:]: mouse_strain_list.append(result[0]) for result in _rats_snp_pattern[2:]: rat_strain_list.append(result[0]) strain_lists['mouse'] = mouse_strain_list strain_lists['rat'] = rat_strain_list return strain_lists def get_header_list(variant_type, strains, species=None, empty_columns=None): if species == "Mouse": strain_list = strains['mouse'] elif species == "Rat": strain_list = strains['rat'] else: strain_list = strains empty_field_count = 0 # ZS: This is an awkward way of letting the javascript know the index where the allele value columns start; there's probably a better way of doing this header_fields = [] header_data_names = [] if variant_type == "SNP": header_fields.append(['Index', 'SNP ID', 'Chr', 'Mb', 'Alleles', 'Source', 'ConScore', 'Gene', 'Transcript', 'Exon', 'Domain 1', 'Domain 2', 'Function', 'Details']) header_data_names = ['index', 'snp_name', 'chr', 'mb_formatted', 'alleles', 'snp_source', 'conservation_score', 'gene_name', 'transcript', 'exon', 'domain_1', 'domain_2', 'function', 'function_details'] header_fields.append(strain_list) header_data_names += strain_list if empty_columns != None: if empty_columns['snp_source'] == "false": empty_field_count += 1 header_fields[0].remove('Source') if empty_columns['conservation_score'] == "false": empty_field_count += 1 header_fields[0].remove('ConScore') if empty_columns['gene_name'] == "false": empty_field_count += 1 header_fields[0].remove('Gene') if empty_columns['transcript'] == "false": empty_field_count += 1 header_fields[0].remove('Transcript') if empty_columns['exon'] == "false": empty_field_count += 1 header_fields[0].remove('Exon') if empty_columns['domain_2'] == "false": empty_field_count += 1 header_fields[0].remove('Domain 2') if empty_columns['function'] == "false": empty_field_count += 1 header_fields[0].remove('Function') if empty_columns['function_details'] == "false": empty_field_count += 1 header_fields[0].remove('Details') for col in empty_columns.keys(): if empty_columns[col] == "false": header_data_names.remove(col) elif variant_type == "InDel": header_fields = ['Index', 'ID', 'Type', 'InDel Chr', 'Mb Start', 'Mb End', 'Strand', 'Size', 'Sequence', 'Source'] header_data_names = ['index', 'indel_name', 'indel_type', 'indel_chr', 'indel_mb_s', 'indel_mb_e', 'indel_strand', 'indel_size', 'indel_sequence', 'source_name'] return header_fields, empty_field_count, header_data_names def get_effect_details_by_category(effect_name=None, effect_value=None): gene_list = [] transcript_list = [] exon_list = [] function_list = [] function_detail_list = [] tmp_list = [] gene_group_list = ['Upstream', 'Downstream', 'Splice Site', 'Nonsplice Site', '3\' UTR'] biotype_group_list = ['Unknown Effect In Exon', 'Start Gained', 'Start Lost', 'Stop Gained', 'Stop Lost', 'Nonsynonymous', 'Synonymous'] new_codon_group_list = ['Start Gained'] codon_effect_group_list = [ 'Start Lost', 'Stop Gained', 'Stop Lost', 'Nonsynonymous', 'Synonymous'] effect_detail_list = effect_value.strip().split('|') effect_detail_list = [item.strip() for item in effect_detail_list] for index, item in enumerate(effect_detail_list): item_list = item.strip().split(',') item_list = [item.strip() for item in item_list] gene_id = item_list[0] gene_name = item_list[1] gene_list.append([gene_id, gene_name]) transcript_list.append(item_list[2]) if effect_name not in gene_group_list: exon_id = item_list[3] exon_rank = item_list[4] exon_list.append([exon_id, exon_rank]) if effect_name in biotype_group_list: biotype = item_list[5] function_list.append(effect_name) if effect_name in new_codon_group_list: new_codon = item_list[6] tmp_list = [biotype, new_codon] function_detail_list.append(", ".join(tmp_list)) elif effect_name in codon_effect_group_list: old_new_AA = item_list[6] old_new_codon = item_list[7] codon_num = item_list[8] tmp_list = [biotype, old_new_AA, old_new_codon, codon_num] function_detail_list.append(", ".join(tmp_list)) else: function_detail_list.append(biotype) return [gene_list, transcript_list, exon_list, function_list, function_detail_list] def get_effect_info(effect_list): domain = "" effect_detail_list = [] effect_info_dict = {} prime3_utr, prime5_utr, upstream, downstream, intron, nonsplice_site, splice_site, intergenic = effect_list[ :8] exon, non_synonymous_coding, synonymous_coding, start_gained, start_lost, stop_gained, stop_lost, unknown_effect_in_exon = effect_list[ 8:16] if intergenic: domain = "Intergenic" effect_info_dict[domain] = "" else: # if not exon, get gene list/transcript list info if upstream: domain = "Upstream" effect_detail_list = get_effect_details_by_category( effect_name='Upstream', effect_value=upstream) effect_info_dict[domain] = effect_detail_list if downstream: domain = "Downstream" effect_detail_list = get_effect_details_by_category( effect_name='Downstream', effect_value=downstream) effect_info_dict[domain] = effect_detail_list if intron: if splice_site: domain = "Splice Site" effect_detail_list = get_effect_details_by_category( effect_name='Splice Site', effect_value=splice_site) effect_info_dict[domain] = effect_detail_list if nonsplice_site: domain = "Nonsplice Site" effect_detail_list = get_effect_details_by_category( effect_name='Nonsplice Site', effect_value=nonsplice_site) effect_info_dict[domain] = effect_detail_list # get gene, transcript_list, and exon info if prime3_utr: domain = "3\' UTR" effect_detail_list = get_effect_details_by_category( effect_name='3\' UTR', effect_value=prime3_utr) effect_info_dict[domain] = effect_detail_list if prime5_utr: domain = "5\' UTR" effect_detail_list = get_effect_details_by_category( effect_name='5\' UTR', effect_value=prime5_utr) effect_info_dict[domain] = effect_detail_list if start_gained: domain = "Start Gained" effect_detail_list = get_effect_details_by_category( effect_name='Start Gained', effect_value=start_gained) effect_info_dict[domain] = effect_detail_list if unknown_effect_in_exon: domain = "Unknown Effect In Exon" effect_detail_list = get_effect_details_by_category( effect_name='Unknown Effect In Exon', effect_value=unknown_effect_in_exon) effect_info_dict[domain] = effect_detail_list if start_lost: domain = "Start Lost" effect_detail_list = get_effect_details_by_category( effect_name='Start Lost', effect_value=start_lost) effect_info_dict[domain] = effect_detail_list if stop_gained: domain = "Stop Gained" effect_detail_list = get_effect_details_by_category( effect_name='Stop Gained', effect_value=stop_gained) effect_info_dict[domain] = effect_detail_list if stop_lost: domain = "Stop Lost" effect_detail_list = get_effect_details_by_category( effect_name='Stop Lost', effect_value=stop_lost) effect_info_dict[domain] = effect_detail_list if non_synonymous_coding: domain = "Nonsynonymous" effect_detail_list = get_effect_details_by_category( effect_name='Nonsynonymous', effect_value=non_synonymous_coding) effect_info_dict[domain] = effect_detail_list if synonymous_coding: domain = "Synonymous" effect_detail_list = get_effect_details_by_category( effect_name='Synonymous', effect_value=synonymous_coding) effect_info_dict[domain] = effect_detail_list return effect_info_dict def get_gene_id(species_id, gene_name): query = ("SELECT geneId FROM GeneList WHERE " "SpeciesId = %s AND geneSymbol = %s") with database_connection(get_setting("SQL_URI")) as conn: with conn.cursor() as cursor: cursor.execute(query, (species_id, gene_name)) if (result := cursor.fetchone()): return result[0] return "" def get_gene_id_name_dict(species_id, gene_name_list): gene_id_name_dict = {} if len(gene_name_list) == 0: return "" query = ("SELECT geneId, geneSymbol FROM " "GeneList WHERE SpeciesId = %s AND " f"geneSymbol in ({', '.join(['%s'] * len(gene_name_list))})") with database_connection(get_setting("SQL_URI")) as conn: with conn.cursor() as cursor: cursor.execute(query, (species_id, *gene_name_list)) results = cursor.fetchall() if results: for item in results: gene_id_name_dict[item[1]] = item[0] return gene_id_name_dict def check_if_in_gene(species_id, chr_, mb): with database_connection(get_setting("SQL_URI")) as conn: with conn.cursor() as cursor: if species_id != 0: # ZS: Check if this is necessary cursor.execute( "SELECT geneId, geneSymbol " "FROM GeneList WHERE " "SpeciesId = %s AND chromosome = %s " "AND (txStart < %s AND txEnd > %s)", (species_id, chr_, mb, mb)) else: cursor.execute( "SELECT geneId,geneSymbol " "FROM GeneList WHERE " "chromosome = %s AND " "(txStart < %s AND txEnd > %s)", (chr_, mb, mb)) if (result := cursor.fetchone()): return [result[0], result[1]] return ""