path: root/gn2/wqflask/snp_browser/snp_browser.py
diff options
Diffstat (limited to 'gn2/wqflask/snp_browser/snp_browser.py')
1 files changed, 934 insertions, 0 deletions
diff --git a/gn2/wqflask/snp_browser/snp_browser.py b/gn2/wqflask/snp_browser/snp_browser.py
new file mode 100644
index 00000000..2d17f82b
--- /dev/null
+++ b/gn2/wqflask/snp_browser/snp_browser.py
@@ -0,0 +1,934 @@
+import string
+from PIL import (Image)
+from gn2.base import species
+from gn2.base import webqtlConfig
+from gn2.wqflask.database import database_connection
+from gn2.utility.tools import get_setting
+class SnpBrowser:
+ def __init__(self, db_cursor, start_vars):
+ self.strain_lists = get_browser_sample_lists()
+ self.initialize_parameters(db_cursor, start_vars)
+ if self.first_run == "false":
+ self.filtered_results = self.get_browser_results()
+ self.table_rows = self.get_table_rows()
+ self.rows_count = len(self.table_rows)
+ del self.filtered_results
+ if 'sEcho' not in start_vars:
+ self.table_rows = []
+ if self.limit_strains == "true":
+ self.header_fields, self.empty_field_count, self.header_data_names = get_header_list(
+ variant_type=self.variant_type, strains=self.chosen_strains, empty_columns=self.empty_columns)
+ else:
+ self.header_fields, self.empty_field_count, self.header_data_names = get_header_list(
+ variant_type=self.variant_type, strains=self.strain_lists, species=self.species_name, empty_columns=self.empty_columns)
+ def initialize_parameters(self, db_cursor, start_vars):
+ if 'first_run' in start_vars:
+ self.first_run = "false"
+ else:
+ self.first_run = "true"
+ self.allele_list = []
+ self.variant_type = "SNP"
+ if 'variant' in start_vars:
+ self.variant_type = start_vars['variant']
+ self.species_name = "Mouse"
+ self.species_id = 1
+ if 'species' in start_vars:
+ self.species_name = start_vars['species']
+ if self.species_name.capitalize() == "Rat":
+ self.species_id = 2
+ self.mouse_chr_list = []
+ self.rat_chr_list = []
+ mouse_species_ob = species.TheSpecies(species_name="Mouse")
+ for key in mouse_species_ob.chromosomes.chromosomes(db_cursor):
+ self.mouse_chr_list.append(
+ mouse_species_ob.chromosomes.chromosomes(db_cursor)[key].name)
+ rat_species_ob = species.TheSpecies(species_name="Rat")
+ for key in rat_species_ob.chromosomes.chromosomes(db_cursor):
+ self.rat_chr_list.append(
+ rat_species_ob.chromosomes.chromosomes(db_cursor)[key].name)
+ if self.species_id == 1:
+ self.this_chr_list = self.mouse_chr_list
+ else:
+ self.this_chr_list = self.rat_chr_list
+ if self.first_run == "true":
+ self.chr = "19"
+ self.start_mb = 30.1
+ self.end_mb = 30.12
+ else:
+ if 'gene_name' in start_vars:
+ if start_vars['gene_name'] != "":
+ self.gene_name = start_vars['gene_name']
+ else:
+ self.gene_name = ""
+ self.chr = start_vars['chr']
+ try:
+ self.start_mb = float(start_vars['start_mb'])
+ self.end_mb = float(start_vars['end_mb'])
+ except:
+ self.start_mb = 0.0
+ self.end_mb = 0.0
+ else:
+ try:
+ self.chr = start_vars['chr']
+ self.start_mb = float(start_vars['start_mb'])
+ self.end_mb = float(start_vars['end_mb'])
+ except:
+ self.chr = "1"
+ self.start_mb = 0.0
+ self.end_mb = 0.0
+ self.limit_strains = "true"
+ if self.first_run == "false":
+ if 'limit_strains' not in start_vars:
+ self.limit_strains = "false"
+ else:
+ if start_vars['limit_strains'] == "false":
+ self.limit_strains = "false"
+ self.chosen_strains_mouse = ["C57BL/6J",
+ "DBA/2J",
+ "A/J",
+ "129S1/SvImJ",
+ "NOD/ShiLtJ",
+ "NZO/HlLtJ",
+ "WSB/EiJ",
+ "PWK/PhJ",
+ "CAST/EiJ"]
+ self.chosen_strains_rat = ["BN", "F344", "WLI", "WMI"]
+ if 'chosen_strains_mouse' in start_vars:
+ self.chosen_strains_mouse = start_vars['chosen_strains_mouse'].split(
+ ",")
+ if 'chosen_strains_rat' in start_vars:
+ self.chosen_strains_rat = start_vars['chosen_strains_rat'].split(
+ ",")
+ if self.species_id == 1:
+ self.chosen_strains = self.chosen_strains_mouse
+ else:
+ self.chosen_strains = self.chosen_strains_rat
+ self.domain = "All"
+ if 'domain' in start_vars:
+ self.domain = start_vars['domain']
+ self.function = "All"
+ if 'function' in start_vars:
+ self.function = start_vars['function']
+ self.source = "All"
+ if 'source' in start_vars:
+ self.source = start_vars['source']
+ self.criteria = ">="
+ if 'criteria' in start_vars:
+ self.criteria = start_vars['criteria']
+ self.score = 0.0
+ if 'score' in start_vars:
+ self.score = start_vars['score']
+ self.redundant = "false"
+ if self.first_run == "false" and 'redundant' in start_vars:
+ self.redundant = "true"
+ self.diff_alleles = "true"
+ if self.first_run == "false":
+ if 'diff_alleles' not in start_vars:
+ self.diff_alleles = "false"
+ else:
+ if start_vars['diff_alleles'] == "false":
+ self.diff_alleles = "false"
+ def get_browser_results(self):
+ self.snp_list = None
+ __query = ""
+ __vars = None
+ with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor:
+ if self.gene_name != "":
+ if self.species_id != 0:
+ __query = ("SELECT geneSymbol, chromosome, txStart, "
+ "txEnd FROM GeneList WHERE SpeciesId = %s "
+ "AND geneSymbol = %s")
+ __vars = (self.species_id, self.gene_name,)
+ else:
+ __query = ("SELECT geneSymbol, chromosome, txStart, "
+ "txEnd FROM GeneList WHERE geneSymbol = %s")
+ __vars = (self.gene_name,)
+ cursor.execute(__query, __vars)
+ if result := cursor.fetchone():
+ self.gene_name, self.chr, self.start_mb, self.end_mb = result
+ else:
+ if self.variant_type in ["SNP", "InDel"]:
+ result_snp = None
+ __vars = (self.gene_name,)
+ if self.variant_type == "SNP":
+ if self.gene_name[:2] == "rs":
+ __query = ("SELECT Id, Chromosome, Position, "
+ "Position+0.000001 FROM SnpAll "
+ "WHERE Rs = %s")
+ else:
+ if self.species_id != 0:
+ __query = (
+ "SELECT Id, Chromosome, Position, "
+ "Position+0.000001 FROM SnpAll WHERE "
+ "SpeciesId = %s AND SnpName = %s")
+ __vars = (self.species_id, self.gene_name,)
+ else:
+ __query = (
+ "SELECT Id, Chromosome, Position, "
+ "Position+0.000001 FROM SnpAll "
+ "WHERE SnpName = %s")
+ cursor.execute(__query, __vars)
+ result_snp = cursor.fetchall()
+ else: # variant_type == InDel
+ if self.gene_name[0] == "I":
+ if self.species_id != 0:
+ __query = (
+ "SELECT Id, Chromosome, Mb_start, "
+ "Mb_end FROM IndelAll WHERE "
+ "SpeciesId = %s AND Name = %s")
+ __vars = (self.species_id, self.gene_name,)
+ else:
+ __query = (
+ "SELECT Id, Chromosome, Mb_start, "
+ "Mb_end FROM IndelAll WHERE Name = %s",)
+ __vars = (self.gene_name,)
+ cursor.execute(__query, __vars)
+ result_snp = cursor.fetchall()
+ if result_snp:
+ self.snp_list = [item[0] for item in result_snp]
+ self.chr = result_snp[0][1]
+ self.start_mb = result_snp[0][2]
+ self.end_mb = result_snp[0][3]
+ else:
+ return []
+ if self.variant_type == "SNP":
+ __vars = (self.species_id, self.chr,
+ f"{self.start_mb:.6f}",
+ f"{self.end_mb:.6f}",)
+ if self.species_id == 1: # Mouse
+ __query = ("SELECT a.*, b.* FROM SnpAll a, SnpPattern b "
+ "WHERE a.SpeciesId = %s AND a.Chromosome = %s "
+ "AND a.Position >= %s AND a.Position < %s "
+ "AND a.Id = b.SnpId ORDER BY a.Position")
+ elif self.species_id == 2: # Rat
+ __query = (
+ "SELECT a.*, b.* FROM SnpAll a, RatSnpPattern b "
+ "WHERE a.SpeciesId = %s AND a.Chromosome = %s "
+ "AND a.Position >= %s AND a.Position < %s "
+ "AND a.Id = b.SnpId ORDER BY a.Position")
+ elif self.variant_type == "InDel":
+ if self.species_id != 0:
+ __query = (
+ "SELECT DISTINCT a.Name, a.Chromosome, a.SourceId, "
+ "a.Mb_start, a.Mb_end, a.Strand, a.Type, a.Size, "
+ "a.InDelSequence, b.Name FROM IndelAll a, "
+ "SnpSource b WHERE a.SpeciesId = %s AND "
+ "a.Chromosome = %s AND a.Mb_start >= %s "
+ "AND a.Mb_start < %s AND b.Id = a.SourceId "
+ "ORDER BY a.Mb_start")
+ __vars = (self.species_id,
+ self.chr, f"{self.start_mb:2.6f}",
+ f"{self.end_mb+0.0010:2.6f}",)
+ cursor.execute(__query, __vars)
+ else:
+ __query = (
+ "SELECT DISTINCT a.Name, a.Chromosome, a.SourceId, "
+ "a.Mb_start, a.Mb_end, a.Strand, a.Type, a.Size, "
+ "a.InDelSequence, b.Name FROM IndelAll a, "
+ "SnpSource b WHERE a.Chromosome = %s AND "
+ "a.Mb_start >= %s AND a.Mb_start < %s "
+ "AND b.Id = a.SourceId ORDER BY a.Mb_start")
+ __vars = (self.chr, f"{self.start_mb+0.0010:2.6f}",
+ f"{self.end_mb+0.0010:2.6f}",)
+ cursor.execute(__query, __vars)
+ return self.filter_results(cursor.fetchall())
+ def filter_results(self, results):
+ filtered_results = []
+ strain_index_list = [] # ZS: List of positions of selected strains in strain list
+ last_mb = -1
+ if self.limit_strains == "true" and len(self.chosen_strains) > 0:
+ for item in self.chosen_strains:
+ index = self.strain_lists[self.species_name.lower()].index(
+ item)
+ strain_index_list.append(index)
+ for seq, result in enumerate(results):
+ result = list(result)
+ if self.variant_type == "SNP":
+ display_strains = []
+ snp_id, species_id, snp_name, rs, chr, mb, mb_2016, alleles, snp_source, conservation_score = result[
+ :10]
+ effect_list = result[10:28]
+ if self.species_id == 1:
+ self.allele_list = result[30:]
+ elif self.species_id == 2:
+ self.allele_list = result[31:]
+ if self.limit_strains == "true" and len(self.chosen_strains) > 0:
+ for index in strain_index_list:
+ if self.species_id == 1:
+ display_strains.append(result[29 + index])
+ elif self.species_id == 2:
+ display_strains.append(result[31 + index])
+ self.allele_list = display_strains
+ effect_info_dict = get_effect_info(effect_list)
+ coding_domain_list = ['Start Gained', 'Start Lost',
+ 'Stop Gained', 'Stop Lost', 'Nonsynonymous', 'Synonymous']
+ intron_domain_list = ['Splice Site', 'Nonsplice Site']
+ for key in effect_info_dict:
+ if key in coding_domain_list:
+ domain = ['Exon', 'Coding']
+ elif key in ['3\' UTR', '5\' UTR']:
+ domain = ['Exon', key]
+ elif key == "Unknown Effect In Exon":
+ domain = ['Exon', '']
+ elif key in intron_domain_list:
+ domain = ['Intron', key]
+ else:
+ domain = [key, '']
+ if 'Intergenic' in domain:
+ if self.gene_name != "":
+ gene_id = get_gene_id(
+ self.species_id, self.gene_name)
+ gene = [gene_id, self.gene_name]
+ else:
+ gene = check_if_in_gene(species_id, chr, mb)
+ transcript = exon = function = function_details = ''
+ if self.redundant == "false" or last_mb != mb: # filter redundant
+ if self.include_record(domain, function, snp_source, conservation_score):
+ info_list = [snp_name, rs, chr, mb, alleles, gene, transcript, exon, domain,
+ function, function_details, snp_source, conservation_score, snp_id]
+ info_list.extend(self.allele_list)
+ filtered_results.append(info_list)
+ last_mb = mb
+ else:
+ gene_list, transcript_list, exon_list, function_list, function_details_list = effect_info_dict[
+ key]
+ for index, item in enumerate(gene_list):
+ gene = item
+ transcript = transcript_list[index]
+ if exon_list:
+ exon = exon_list[index]
+ else:
+ exon = ""
+ if function_list:
+ function = function_list[index]
+ if function == "Unknown Effect In Exon":
+ function = "Unknown"
+ else:
+ function = ""
+ if function_details_list:
+ function_details = "Biotype: " + \
+ function_details_list[index]
+ else:
+ function_details = ""
+ if self.redundant == "false" or last_mb != mb:
+ if self.include_record(domain, function, snp_source, conservation_score):
+ info_list = [snp_name, rs, chr, mb, alleles, gene, transcript, exon, domain,
+ function, function_details, snp_source, conservation_score, snp_id]
+ info_list.extend(self.allele_list)
+ filtered_results.append(info_list)
+ last_mb = mb
+ elif self.variant_type == "InDel":
+ # The order of variables is important; this applies to anything from the variant table as indel
+ indel_name, indel_chr, source_id, indel_mb_start, indel_mb_end, indel_strand, indel_type, indel_size, indel_sequence, source_name = result
+ indel_type = indel_type.title()
+ if self.redundant == "false" or last_mb != indel_mb_start:
+ gene = "No Gene"
+ domain = conservation_score = snp_id = snp_name = rs = flank_3 = flank_5 = ncbi = function = ""
+ if self.include_record(domain, function, source_name, conservation_score):
+ filtered_results.append([indel_name, indel_chr, indel_mb_start, indel_mb_end,
+ indel_strand, indel_type, indel_size, indel_sequence, source_name])
+ last_mb = indel_mb_start
+ else:
+ filtered_results.append(result)
+ return filtered_results
+ def get_table_rows(self):
+ """ Take results and put them into the order and format necessary for the tables rows """
+ if self.variant_type == "SNP":
+ gene_name_list = []
+ for item in self.filtered_results:
+ if item[5] and item[5] != "":
+ gene_name = item[5][1]
+ # eliminate duplicate gene_name
+ if gene_name and (gene_name not in gene_name_list):
+ gene_name_list.append(gene_name)
+ if len(gene_name_list) > 0:
+ gene_id_name_dict = get_gene_id_name_dict(
+ self.species_id, gene_name_list)
+ # ZS: list of booleans representing which columns are entirely empty, so they aren't displayed on the page; only including ones that are sometimes empty (since there's always a location, etc)
+ self.empty_columns = {
+ "snp_source": "false",
+ "conservation_score": "false",
+ "gene_name": "false",
+ "transcript": "false",
+ "exon": "false",
+ "domain_2": "false",
+ "function": "false",
+ "function_details": "false"
+ }
+ the_rows = []
+ for i, result in enumerate(self.filtered_results):
+ this_row = {}
+ if self.variant_type == "SNP":
+ snp_name, rs, chr, mb, alleles, gene, transcript, exon, domain, function, function_details, snp_source, conservation_score, snp_id = result[
+ :14]
+ allele_value_list = result[14:]
+ if rs:
+ snp_url = webqtlConfig.DBSNP % (rs)
+ snp_name = rs
+ else:
+ rs = ""
+ start_bp = int(mb * 1000000 - 100)
+ end_bp = int(mb * 1000000 + 100)
+ position_info = "chr%s:%d-%d" % (chr, start_bp, end_bp)
+ if self.species_id == 2:
+ snp_url = webqtlConfig.GENOMEBROWSER_URL % (
+ "rn6", position_info)
+ else:
+ snp_url = webqtlConfig.GENOMEBROWSER_URL % (
+ "mm10", position_info)
+ mb = float(mb)
+ mb_formatted = "%2.6f" % mb
+ if snp_source == "Sanger/UCLA":
+ source_url_1 = "http://www.sanger.ac.uk/resources/mouse/genomes/"
+ source_url_2 = "http://mouse.cs.ucla.edu/mousehapmap/beta/wellcome.html"
+ source_urls = [source_url_1, source_url_2]
+ self.empty_columns['snp_source'] = "true"
+ else:
+ source_urls = []
+ if not conservation_score:
+ conservation_score = ""
+ else:
+ self.empty_columns['conservation_score'] = "true"
+ if gene:
+ gene_name = gene[1]
+ # if gene_name has related gene_id, use gene_id for NCBI search
+ if (gene_name in gene_id_name_dict) and (gene_id_name_dict[gene_name] != None and gene_id_name_dict[gene_name] != ""):
+ gene_id = gene_id_name_dict[gene[1]]
+ gene_link = webqtlConfig.NCBI_LOCUSID % gene_id
+ else:
+ gene_link = "http://www.ncbi.nlm.nih.gov/entrez/query.fcgi?CMD=search&DB=gene&term=%s" % gene_name
+ self.empty_columns['gene_name'] = "true"
+ else:
+ gene_name = ""
+ gene_link = ""
+ if transcript:
+ transcript_link = webqtlConfig.ENSEMBLETRANSCRIPT_URL % (
+ transcript)
+ self.empty_columns['transcript'] = "true"
+ else:
+ transcript_link = ""
+ if exon:
+ exon = exon[1] # exon[0] is exon_id, exon[1] is exon_rank
+ self.empty_columns['exon'] = "true"
+ else:
+ exon = ""
+ if domain:
+ domain_1 = domain[0]
+ domain_2 = domain[1]
+ if domain_1 == "Intergenic" and gene != "":
+ domain_1 = gene_name
+ else:
+ if domain_1 == "Exon":
+ domain_1 = domain_1 + " " + exon
+ if domain_2 != "":
+ self.empty_columns['domain_2'] = "true"
+ if function:
+ self.empty_columns['function'] = "true"
+ function_list = []
+ if function_details:
+ function_list = function_details.strip().split(",")
+ function_list = [item.strip() for item in function_list]
+ function_list[0] = function_list[0].title()
+ function_details = ", ".join(
+ item for item in function_list)
+ function_details = function_details.replace("_", " ")
+ function_details = function_details.replace("/", " -> ")
+ if function_details == "Biotype: Protein Coding":
+ function_details = function_details + ", Coding Region Unknown"
+ self.empty_columns['function_details'] = "true"
+ #[snp_href, chr, mb_formatted, alleles, snp_source_cell, conservation_score, gene_name_cell, transcript_href, exon, domain_1, domain_2, function, function_details]
+ base_color_dict = {"A": "#C33232", "C": "#1569C7", "T": "#CFCF32", "G": "#32C332",
+ "t": "#FF6", "c": "#5CB3FF", "a": "#F66", "g": "#CF9", ":": "#FFFFFF", "-": "#FFFFFF", "?": "#FFFFFF"}
+ the_bases = []
+ for j, item in enumerate(allele_value_list):
+ if item and isinstance(item, str):
+ this_base = [str(item), base_color_dict[item]]
+ else:
+ this_base = ""
+ the_bases.append(this_base)
+ this_row = {
+ "index": i + 1,
+ "rs": str(rs),
+ "snp_url": str(snp_url),
+ "snp_name": str(snp_name),
+ "chr": str(chr),
+ "mb_formatted": mb_formatted,
+ "alleles": str(alleles),
+ "snp_source": str(snp_source),
+ "source_urls": source_urls,
+ "conservation_score": str(conservation_score),
+ "gene_name": str(gene_name),
+ "gene_link": str(gene_link),
+ "transcript": str(transcript),
+ "transcript_link": str(transcript_link),
+ "exon": str(exon),
+ "domain_1": str(domain_1),
+ "domain_2": str(domain_2),
+ "function": str(function),
+ "function_details": str(function_details),
+ "allele_value_list": the_bases
+ }
+ elif self.variant_type == "InDel":
+ indel_name, indel_chr, indel_mb_s, indel_mb_e, indel_strand, indel_type, indel_size, indel_sequence, source_name = result
+ this_row = {
+ "index": i,
+ "indel_name": str(indel_name),
+ "indel_chr": str(indel_chr),
+ "indel_mb_s": str(indel_mb_s),
+ "indel_mb_e": str(indel_mb_e),
+ "indel_strand": str(indel_strand),
+ "indel_type": str(indel_type),
+ "indel_size": str(indel_size),
+ "indel_sequence": str(indel_sequence),
+ "source_name": str(source_name)
+ }
+ #this_row = [indel_name, indel_chr, indel_mb_s, indel_mb_e, indel_strand, indel_type, indel_size, indel_sequence, source_name]
+ the_rows.append(this_row)
+ return the_rows
+ def include_record(self, domain, function, snp_source, conservation_score):
+ """ Decide whether to add this record """
+ domain_satisfied = True
+ function_satisfied = True
+ different_alleles_satisfied = True
+ source_satisfied = True
+ if domain:
+ if len(domain) == 0:
+ if self.domain != "All":
+ domain_satisfied = False
+ else:
+ domain_satisfied = False
+ if domain[0].startswith(self.domain) or domain[1].startswith(self.domain) or self.domain == "All":
+ domain_satisfied = True
+ else:
+ if self.domain != "All":
+ domain_satisfied = False
+ if snp_source:
+ if len(snp_source) == 0:
+ if self.source != "All":
+ source_satisfied = False
+ else:
+ source_satisfied = False
+ if snp_source.startswith(self.source) or self.source == "All":
+ source_satisfied = True
+ else:
+ if self.source != "All":
+ source_satisfied = False
+ if function:
+ if len(function) == 0:
+ if self.function != "All":
+ function_satisfied = False
+ else:
+ function_satisfied = False
+ if self.function != "All":
+ if function.startswith(self.function):
+ function_satisfied = True
+ else:
+ function_satisfied = True
+ else:
+ if self.function != "All":
+ function_satisfied = False
+ if conservation_score:
+ score_as_float = float(conservation_score)
+ try:
+ input_score_float = float(self.score) # the user-input score
+ except:
+ input_score_float = 0.0
+ if self.criteria == ">=":
+ if score_as_float >= input_score_float:
+ score_satisfied = True
+ else:
+ score_satisfied = False
+ elif self.criteria == "==":
+ if score_as_float == input_score_float:
+ score_satisfied = True
+ else:
+ score_satisfied = False
+ elif self.criteria == "<=":
+ if score_as_float <= input_score_float:
+ score_satisfied = True
+ else:
+ score_satisfied = False
+ else:
+ try:
+ if float(self.score) > 0:
+ score_satisfied = False
+ else:
+ score_satisfied = True
+ except:
+ score_satisfied = True
+ if self.variant_type == "SNP" and self.diff_alleles == "true":
+ this_allele_list = []
+ for item in self.allele_list:
+ if item and isinstance(item, str) and (item.lower() not in this_allele_list) and (item != "-"):
+ this_allele_list.append(item.lower())
+ total_allele_count = len(this_allele_list)
+ if total_allele_count <= 1:
+ different_alleles_satisfied = False
+ else:
+ different_alleles_satisfied = True
+ else:
+ different_alleles_satisfied = True
+ return domain_satisfied and function_satisfied and source_satisfied and score_satisfied and different_alleles_satisfied
+ def snp_density_map(self, query, results):
+ canvas_width = 900
+ canvas_height = 200
+ snp_canvas = Image.new("RGBA", size=(canvas_width, canvas_height))
+ left_offset, right_offset, top_offset, bottom_offset = (30, 30, 40, 50)
+ plot_width = canvas_width - left_offset - right_offset
+ plot_height = canvas_height - top_offset - bottom_offset
+ y_zero = top_offset + plot_height / 2
+ x_scale = plot_width / (self.end_mb - self.start_mb)
+ # draw clickable image map at some point
+ n_click = 80.0
+ click_step = plot_width / n_click
+ click_mb_step = (self.end_mb - self.start_mb) / n_click
+def get_browser_sample_lists(species_id=1):
+ strain_lists = {}
+ mouse_strain_list = []
+ rat_strain_list = []
+ with database_connection(get_setting("SQL_URI")) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute("SHOW COLUMNS FROM SnpPattern")
+ _mouse_snp_pattern = cursor.fetchall()
+ cursor.execute("SHOW COLUMNS FROM RatSnpPattern")
+ _rats_snp_pattern = cursor.fetchall()
+ for result in _mouse_snp_pattern[1:]:
+ mouse_strain_list.append(result[0])
+ for result in _rats_snp_pattern[2:]:
+ rat_strain_list.append(result[0])
+ strain_lists['mouse'] = mouse_strain_list
+ strain_lists['rat'] = rat_strain_list
+ return strain_lists
+def get_header_list(variant_type, strains, species=None, empty_columns=None):
+ if species == "Mouse":
+ strain_list = strains['mouse']
+ elif species == "Rat":
+ strain_list = strains['rat']
+ else:
+ strain_list = strains
+ empty_field_count = 0 # ZS: This is an awkward way of letting the javascript know the index where the allele value columns start; there's probably a better way of doing this
+ header_fields = []
+ header_data_names = []
+ if variant_type == "SNP":
+ header_fields.append(['Index', 'SNP ID', 'Chr', 'Mb', 'Alleles', 'Source', 'ConScore',
+ 'Gene', 'Transcript', 'Exon', 'Domain 1', 'Domain 2', 'Function', 'Details'])
+ header_data_names = ['index', 'snp_name', 'chr', 'mb_formatted', 'alleles', 'snp_source', 'conservation_score',
+ 'gene_name', 'transcript', 'exon', 'domain_1', 'domain_2', 'function', 'function_details']
+ header_fields.append(strain_list)
+ header_data_names += strain_list
+ if empty_columns != None:
+ if empty_columns['snp_source'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Source')
+ if empty_columns['conservation_score'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('ConScore')
+ if empty_columns['gene_name'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Gene')
+ if empty_columns['transcript'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Transcript')
+ if empty_columns['exon'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Exon')
+ if empty_columns['domain_2'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Domain 2')
+ if empty_columns['function'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Function')
+ if empty_columns['function_details'] == "false":
+ empty_field_count += 1
+ header_fields[0].remove('Details')
+ for col in empty_columns.keys():
+ if empty_columns[col] == "false":
+ header_data_names.remove(col)
+ elif variant_type == "InDel":
+ header_fields = ['Index', 'ID', 'Type', 'InDel Chr',
+ 'Mb Start', 'Mb End', 'Strand', 'Size', 'Sequence', 'Source']
+ header_data_names = ['index', 'indel_name', 'indel_type', 'indel_chr', 'indel_mb_s',
+ 'indel_mb_e', 'indel_strand', 'indel_size', 'indel_sequence', 'source_name']
+ return header_fields, empty_field_count, header_data_names
+def get_effect_details_by_category(effect_name=None, effect_value=None):
+ gene_list = []
+ transcript_list = []
+ exon_list = []
+ function_list = []
+ function_detail_list = []
+ tmp_list = []
+ gene_group_list = ['Upstream', 'Downstream',
+ 'Splice Site', 'Nonsplice Site', '3\' UTR']
+ biotype_group_list = ['Unknown Effect In Exon', 'Start Gained',
+ 'Start Lost', 'Stop Gained', 'Stop Lost', 'Nonsynonymous', 'Synonymous']
+ new_codon_group_list = ['Start Gained']
+ codon_effect_group_list = [
+ 'Start Lost', 'Stop Gained', 'Stop Lost', 'Nonsynonymous', 'Synonymous']
+ effect_detail_list = effect_value.strip().split('|')
+ effect_detail_list = [item.strip() for item in effect_detail_list]
+ for index, item in enumerate(effect_detail_list):
+ item_list = item.strip().split(',')
+ item_list = [item.strip() for item in item_list]
+ gene_id = item_list[0]
+ gene_name = item_list[1]
+ gene_list.append([gene_id, gene_name])
+ transcript_list.append(item_list[2])
+ if effect_name not in gene_group_list:
+ exon_id = item_list[3]
+ exon_rank = item_list[4]
+ exon_list.append([exon_id, exon_rank])
+ if effect_name in biotype_group_list:
+ biotype = item_list[5]
+ function_list.append(effect_name)
+ if effect_name in new_codon_group_list:
+ new_codon = item_list[6]
+ tmp_list = [biotype, new_codon]
+ function_detail_list.append(", ".join(tmp_list))
+ elif effect_name in codon_effect_group_list:
+ old_new_AA = item_list[6]
+ old_new_codon = item_list[7]
+ codon_num = item_list[8]
+ tmp_list = [biotype, old_new_AA, old_new_codon, codon_num]
+ function_detail_list.append(", ".join(tmp_list))
+ else:
+ function_detail_list.append(biotype)
+ return [gene_list, transcript_list, exon_list, function_list, function_detail_list]
+def get_effect_info(effect_list):
+ domain = ""
+ effect_detail_list = []
+ effect_info_dict = {}
+ prime3_utr, prime5_utr, upstream, downstream, intron, nonsplice_site, splice_site, intergenic = effect_list[
+ :8]
+ exon, non_synonymous_coding, synonymous_coding, start_gained, start_lost, stop_gained, stop_lost, unknown_effect_in_exon = effect_list[
+ 8:16]
+ if intergenic:
+ domain = "Intergenic"
+ effect_info_dict[domain] = ""
+ else:
+ # if not exon, get gene list/transcript list info
+ if upstream:
+ domain = "Upstream"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Upstream', effect_value=upstream)
+ effect_info_dict[domain] = effect_detail_list
+ if downstream:
+ domain = "Downstream"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Downstream', effect_value=downstream)
+ effect_info_dict[domain] = effect_detail_list
+ if intron:
+ if splice_site:
+ domain = "Splice Site"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Splice Site', effect_value=splice_site)
+ effect_info_dict[domain] = effect_detail_list
+ if nonsplice_site:
+ domain = "Nonsplice Site"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Nonsplice Site', effect_value=nonsplice_site)
+ effect_info_dict[domain] = effect_detail_list
+ # get gene, transcript_list, and exon info
+ if prime3_utr:
+ domain = "3\' UTR"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='3\' UTR', effect_value=prime3_utr)
+ effect_info_dict[domain] = effect_detail_list
+ if prime5_utr:
+ domain = "5\' UTR"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='5\' UTR', effect_value=prime5_utr)
+ effect_info_dict[domain] = effect_detail_list
+ if start_gained:
+ domain = "Start Gained"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Start Gained', effect_value=start_gained)
+ effect_info_dict[domain] = effect_detail_list
+ if unknown_effect_in_exon:
+ domain = "Unknown Effect In Exon"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Unknown Effect In Exon', effect_value=unknown_effect_in_exon)
+ effect_info_dict[domain] = effect_detail_list
+ if start_lost:
+ domain = "Start Lost"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Start Lost', effect_value=start_lost)
+ effect_info_dict[domain] = effect_detail_list
+ if stop_gained:
+ domain = "Stop Gained"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Stop Gained', effect_value=stop_gained)
+ effect_info_dict[domain] = effect_detail_list
+ if stop_lost:
+ domain = "Stop Lost"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Stop Lost', effect_value=stop_lost)
+ effect_info_dict[domain] = effect_detail_list
+ if non_synonymous_coding:
+ domain = "Nonsynonymous"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Nonsynonymous', effect_value=non_synonymous_coding)
+ effect_info_dict[domain] = effect_detail_list
+ if synonymous_coding:
+ domain = "Synonymous"
+ effect_detail_list = get_effect_details_by_category(
+ effect_name='Synonymous', effect_value=synonymous_coding)
+ effect_info_dict[domain] = effect_detail_list
+ return effect_info_dict
+def get_gene_id(species_id, gene_name):
+ query = ("SELECT geneId FROM GeneList WHERE "
+ "SpeciesId = %s AND geneSymbol = %s")
+ with database_connection(get_setting("SQL_URI")) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(query, (species_id, gene_name))
+ if (result := cursor.fetchone()):
+ return result[0]
+ return ""
+def get_gene_id_name_dict(species_id, gene_name_list):
+ gene_id_name_dict = {}
+ if len(gene_name_list) == 0:
+ return ""
+ query = ("SELECT geneId, geneSymbol FROM "
+ "GeneList WHERE SpeciesId = %s AND "
+ f"geneSymbol in ({', '.join(['%s'] * len(gene_name_list))})")
+ with database_connection(get_setting("SQL_URI")) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(query, (species_id, *gene_name_list))
+ results = cursor.fetchall()
+ if results:
+ for item in results:
+ gene_id_name_dict[item[1]] = item[0]
+ return gene_id_name_dict
+def check_if_in_gene(species_id, chr_, mb):
+ with database_connection(get_setting("SQL_URI")) as conn:
+ with conn.cursor() as cursor:
+ if species_id != 0: # ZS: Check if this is necessary
+ cursor.execute(
+ "SELECT geneId, geneSymbol "
+ "FROM GeneList WHERE "
+ "SpeciesId = %s AND chromosome = %s "
+ "AND (txStart < %s AND txEnd > %s)",
+ (species_id, chr_, mb, mb))
+ else:
+ cursor.execute(
+ "SELECT geneId,geneSymbol "
+ "FROM GeneList WHERE "
+ "chromosome = %s AND "
+ "(txStart < %s AND txEnd > %s)",
+ (chr_, mb, mb))
+ if (result := cursor.fetchone()):
+ return [result[0], result[1]]
+ return ""