import json import re import requests import string from gn2.wqflask.database import database_connection import sys from gn2.db import webqtlDatabaseFunction from gn2.utility.tools import get_setting, GN2_BASE_URL class DoSearch: """Parent class containing parameters/functions used for all searches""" # Used to translate search phrases into classes search_types = dict() def __init__(self, search_term, search_operator=None, dataset=None, search_type=None): self.search_term = search_term # Make sure search_operator is something we expect assert search_operator in ( None, "=", "<", ">", "<=", ">="), "Bad search operator" self.search_operator = search_operator self.dataset = dataset self.search_type = search_type if self.dataset: # Get group information for dataset and the species id self.species_id = webqtlDatabaseFunction.retrieve_species_id( self.dataset.group.name) def execute(self, query): """Executes query and returns results""" query = self.normalize_spaces(query) with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: cursor.execute(query) return cursor.fetchall() def handle_wildcard(self, str): keyword = str.strip() keyword = keyword.replace("*", ".*") keyword = keyword.replace("?", ".") return keyword def sescape(self, item): """Single escape""" from gn2.utility.tools import get_setting with database_connection(get_setting("SQL_URI")) as conn: escaped = conn.escape_string(str(item)).decode() return escaped def mescape(self, *items): """Multiple escape""" from gn2.utility.tools import get_setting escaped = [] with database_connection(get_setting("SQL_URI")) as conn: escaped = [conn.escape_string(str(item)).decode() for item in items] return tuple(escaped) def normalize_spaces(self, stringy): """Strips out newlines/extra spaces and replaces them with just spaces""" step_one = " ".join(stringy.split()) return step_one @classmethod def get_search(cls, search_type): search_type_string = search_type['dataset_type'] if 'key' in search_type and search_type['key'] != None: search_type_string += '_' + search_type['key'] if search_type_string in cls.search_types: return cls.search_types[search_type_string] else: return None class MrnaAssaySearch(DoSearch): """A search within an expression dataset, including mRNA, protein, SNP, but not phenotype or metabolites""" DoSearch.search_types['ProbeSet'] = "MrnaAssaySearch" base_query = """ SELECT DISTINCT ProbeSetFreeze.`Name`, ProbeSetFreeze.`FullName`, ProbeSet.`Name`, ProbeSet.`Symbol`, CAST(ProbeSet.`description` AS BINARY), CAST(ProbeSet.`Probe_Target_Description` AS BINARY), ProbeSet.`Chr`, ProbeSet.`Mb`, ProbeSetXRef.`Mean`, ProbeSetXRef.`LRS`, ProbeSetXRef.`Locus`, ProbeSetXRef.`pValue`, ProbeSetXRef.`additive`, Geno.`Chr` as geno_chr, Geno.`Mb` as geno_mb FROM Species INNER JOIN InbredSet ON InbredSet.`SpeciesId`= Species.`Id` INNER JOIN ProbeFreeze ON ProbeFreeze.`InbredSetId` = InbredSet.`Id` INNER JOIN Tissue ON ProbeFreeze.`TissueId` = Tissue.`Id` INNER JOIN ProbeSetFreeze ON ProbeSetFreeze.`ProbeFreezeId` = ProbeFreeze.`Id` INNER JOIN ProbeSetXRef ON ProbeSetXRef.`ProbeSetFreezeId` = ProbeSetFreeze.`Id` INNER JOIN ProbeSet ON ProbeSet.`Id` = ProbeSetXRef.`ProbeSetId` LEFT JOIN Geno ON ProbeSetXRef.`Locus` = Geno.`Name` AND Geno.`SpeciesId` = Species.`Id` """ header_fields = ['Index', 'Record', 'Symbol', 'Description', 'Location', 'Mean', 'Max LRS', 'Max LRS Location', 'Additive Effect'] def get_alias_where_clause(self): search_string = self.sescape(self.search_term[0]) if self.search_term[0] != "*": match_clause = """((MATCH (ProbeSet.symbol) AGAINST ('%s' IN BOOLEAN MODE))) and """ % ( search_string) else: match_clause = "" where_clause = (match_clause + """ProbeSet.Id = ProbeSetXRef.ProbeSetId and ProbeSetXRef.ProbeSetFreezeId = %s """ % (self.sescape(str(self.dataset.id)))) return where_clause def get_where_clause(self): search_string = self.sescape(self.search_term[0]) if self.search_term[0] != "*": if re.search("\w{1,2}\-\w+|\w+\-\w{1,2}", self.search_term[0]): search_string = f'"{search_string}*"' match_clause = f"""((MATCH (ProbeSet.Name, ProbeSet.description, ProbeSet.symbol, alias, GenbankId, UniGeneId, Probe_Target_Description) AGAINST ('{search_string}' IN BOOLEAN MODE))) AND """ else: match_clause = "" where_clause = (match_clause + """ProbeSet.Id = ProbeSetXRef.ProbeSetId and ProbeSetXRef.ProbeSetFreezeId = %s """ % (self.sescape(str(self.dataset.id)))) return where_clause def compile_final_query(self, from_clause='', where_clause=''): """Generates the final query string""" from_clause = self.normalize_spaces(from_clause) query = (self.base_query + """%s WHERE %s and ProbeSet.Id = ProbeSetXRef.ProbeSetId and ProbeSetXRef.ProbeSetFreezeId = %s ORDER BY ProbeSet.symbol ASC """ % (self.sescape(from_clause), where_clause, self.sescape(str(self.dataset.id)))) return query def run_combined(self, from_clause='', where_clause=''): """Generates and runs a combined search of an mRNA expression dataset""" #query = self.base_query + from_clause + " WHERE " + where_clause from_clause = self.normalize_spaces(from_clause) query = (self.base_query + """%s WHERE %s and ProbeSet.Id = ProbeSetXRef.ProbeSetId and ProbeSetXRef.ProbeSetFreezeId = %s ORDER BY ProbeSet.symbol ASC """ % (self.sescape(from_clause), where_clause, self.sescape(str(self.dataset.id)))) return self.execute(query) def run(self): """Generates and runs a simple search of an mRNA expression dataset""" where_clause = self.get_where_clause() query = self.base_query + "WHERE " + where_clause + "ORDER BY ProbeSet.symbol ASC" return self.execute(query) class PhenotypeSearch(DoSearch): """A search within a phenotype dataset""" DoSearch.search_types['Publish'] = "PhenotypeSearch" base_query = """SELECT PublishXRef.Id, CAST(Phenotype.`Pre_publication_description` AS BINARY), CAST(Phenotype.`Post_publication_description` AS BINARY), Publication.`Authors`, Publication.`Year`, Publication.`PubMed_ID`, PublishXRef.`mean`, PublishXRef.`LRS`, PublishXRef.`additive`, PublishXRef.`Locus`, InbredSet.`InbredSetCode`, Geno.`Chr`, Geno.`Mb` FROM Species INNER JOIN InbredSet ON InbredSet.`SpeciesId` = Species.`Id` INNER JOIN PublishXRef ON PublishXRef.`InbredSetId` = InbredSet.`Id` INNER JOIN PublishFreeze ON PublishFreeze.`InbredSetId` = InbredSet.`Id` INNER JOIN Publication ON Publication.`Id` = PublishXRef.`PublicationId` INNER JOIN Phenotype ON Phenotype.`Id` = PublishXRef.`PhenotypeId` LEFT JOIN Geno ON PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id """ search_fields = ('Phenotype.Post_publication_description', 'Phenotype.Pre_publication_description', 'Phenotype.Pre_publication_abbreviation', 'Phenotype.Post_publication_abbreviation', 'Phenotype.Lab_code', 'Publication.PubMed_ID', 'Publication.Abstract', 'Publication.Title', 'Publication.Authors', 'PublishXRef.Id') header_fields = ['Index', 'Record', 'Description', 'Mean', 'Authors', 'Year', 'Max LRS', 'Max LRS Location', 'Additive Effect'] def get_where_clause(self): """Generate clause for WHERE portion of query""" # Todo: Zach will figure out exactly what both these lines mean # and comment here # if "'" not in self.search_term[0]: search_term = self.search_term[0] if not self.search_term[0].isnumeric() or len(self.search_term[0]) != 5: # To make sure phenotype trait IDs aren't included in a fulltext search search_term = "%" + \ self.handle_wildcard(self.search_term[0]) + "%" if "_" in self.search_term[0]: if len(self.search_term[0].split("_")[0]) == 3: search_term = "%" + self.handle_wildcard( self.search_term[0].split("_")[1]) + "%" # This adds a clause to the query that matches the search term # against each field in the search_fields tuple where_clause_list = [] for field in self.search_fields: where_clause_list.append('''%s LIKE "%s"''' % (field, search_term)) where_clause = "(%s) " % ' OR '.join(where_clause_list) return where_clause def compile_final_query(self, from_clause='', where_clause=''): """Generates the final query string""" from_clause = self.normalize_spaces(from_clause) if self.search_term[0] == "*": query = (self.base_query + """%s WHERE PublishXRef.InbredSetId = %s and PublishXRef.PhenotypeId = Phenotype.Id and PublishXRef.PublicationId = Publication.Id and PublishFreeze.Id = %s ORDER BY PublishXRef.Id""" % ( from_clause, self.sescape(str(self.dataset.group.id)), self.sescape(str(self.dataset.id)))) else: query = (self.base_query + """%s WHERE %s and PublishXRef.InbredSetId = %s and PublishXRef.PhenotypeId = Phenotype.Id and PublishXRef.PublicationId = Publication.Id and PublishFreeze.Id = %s ORDER BY PublishXRef.Id""" % ( from_clause, where_clause, self.sescape(str(self.dataset.group.id)), self.sescape(str(self.dataset.id)))) return query def run_combined(self, from_clause, where_clause): """Generates and runs a combined search of an phenotype dataset""" from_clause = self.normalize_spaces(from_clause) query = (self.base_query + """%s WHERE %s PublishXRef.InbredSetId = %s and PublishXRef.PhenotypeId = Phenotype.Id and PublishXRef.PublicationId = Publication.Id and PublishFreeze.Id = %s""" % ( from_clause, where_clause, self.sescape(str(self.dataset.group.id)), self.sescape(str(self.dataset.id)))) return self.execute(query) def run(self): """Generates and runs a simple search of a phenotype dataset""" query = self.compile_final_query(where_clause=self.get_where_clause()) return self.execute(query) class GenotypeSearch(DoSearch): """A search within a genotype dataset""" DoSearch.search_types['Geno'] = "GenotypeSearch" base_query = """SELECT Geno.Name, GenoFreeze.createtime as thistable, Geno.Name as Geno_Name, Geno.Source2 as Geno_Source2, Geno.Chr as Geno_Chr, Geno.Mb as Geno_Mb FROM GenoXRef, GenoFreeze, Geno """ search_fields = ('Name', 'Chr') header_fields = ['Index', 'Record', 'Location'] def get_where_clause(self): """Generate clause for part of the WHERE portion of query""" # This adds a clause to the query that matches the search term # against each field in search_fields (above) where_clause = [] if "'" not in self.search_term[0]: self.search_term = "%" + self.search_term[0] + "%" for field in self.search_fields: where_clause.append('''%s LIKE "%s"''' % ("%s.%s" % self.mescape(self.dataset.type, field), self.search_term)) where_clause = "(%s) " % ' OR '.join(where_clause) return where_clause def compile_final_query(self, from_clause='', where_clause=''): """Generates the final query string""" from_clause = self.normalize_spaces(from_clause) if self.search_term[0] == "*": query = (self.base_query + """WHERE Geno.Id = GenoXRef.GenoId and GenoXRef.GenoFreezeId = GenoFreeze.Id and GenoFreeze.Id = %s""" % (self.sescape(str(self.dataset.id)))) else: query = (self.base_query + """WHERE %s and Geno.Id = GenoXRef.GenoId and GenoXRef.GenoFreezeId = GenoFreeze.Id and GenoFreeze.Id = %s""" % (where_clause, self.sescape(str(self.dataset.id)))) return query def run(self): """Generates and runs a simple search of a genotype dataset""" # Todo: Zach will figure out exactly what both these lines mean # and comment here if self.search_term[0] == "*": self.query = self.compile_final_query() else: self.query = self.compile_final_query( where_clause=self.get_where_clause()) return self.execute(self.query) class RifSearch(MrnaAssaySearch): """Searches for traits with a Gene RIF entry including the search term.""" DoSearch.search_types['ProbeSet_RIF'] = "RifSearch" def get_from_clause(self): return f" INNER JOIN GeneRIF_BASIC ON GeneRIF_BASIC.`symbol` = { self.dataset.type }.`symbol` " def get_where_clause(self): where_clause = f"(MATCH (GeneRIF_BASIC.comment) AGAINST ('+{ self.search_term[0] }' IN BOOLEAN MODE)) " return where_clause def run(self): from_clause = self.get_from_clause() where_clause = self.get_where_clause() query = self.compile_final_query(from_clause, where_clause) return self.execute(query) class WikiSearch(MrnaAssaySearch): """Searches GeneWiki for traits other people have annotated""" DoSearch.search_types['ProbeSet_WIKI'] = "WikiSearch" def get_from_clause(self): return ", GeneRIF " def get_where_clause(self): where_clause = """%s.symbol = GeneRIF.symbol and GeneRIF.versionId=0 and GeneRIF.display>0 and (GeneRIF.comment LIKE '%s' or GeneRIF.initial = '%s') """ % (self.dataset.type, "%" + str(self.search_term[0]) + "%", str(self.search_term[0])) return where_clause def run(self): from_clause = self.get_from_clause() where_clause = self.get_where_clause() query = self.compile_final_query(from_clause, where_clause) return self.execute(query) class GoSearch(MrnaAssaySearch): """Searches for synapse-associated genes listed in the Gene Ontology.""" DoSearch.search_types['ProbeSet_GO'] = "GoSearch" def get_from_clause(self): from_clause = """, db_GeneOntology.term as GOterm, db_GeneOntology.association as GOassociation, db_GeneOntology.gene_product as GOgene_product """ return from_clause def get_where_clause(self): field = 'GOterm.acc' go_id = 'GO:' + ('0000000' + self.search_term[0])[-7:] statements = ("""%s.symbol=GOgene_product.symbol and GOassociation.gene_product_id=GOgene_product.id and GOterm.id=GOassociation.term_id""" % ( self.sescape(self.dataset.type))) where_clause = " %s = '%s' and %s " % (field, go_id, statements) return where_clause def run(self): from_clause = self.get_from_clause() where_clause = self.get_where_clause() query = self.compile_final_query(from_clause, where_clause) return self.execute(query) # ZS: Not sure what the best way to deal with LRS searches is class LrsSearch(DoSearch): """Searches for genes with a QTL within the given LRS values LRS searches can take 3 different forms: - LRS > (or <) min/max_LRS - LRS=(min_LRS max_LRS) - LRS=(min_LRS max_LRS chromosome start_Mb end_Mb) where min/max_LRS represent the range of LRS scores and start/end_Mb represent the range in megabases on the given chromosome """ for search_key in ('LRS', 'LOD'): DoSearch.search_types[search_key] = "LrsSearch" def get_from_clause(self): converted_search_term = [] for value in self.search_term: try: converted_search_term.append(float(value)) except: converted_search_term.append(value) self.search_term = converted_search_term from_clause = "" return from_clause def get_where_clause(self): if self.search_operator == "=": assert isinstance(self.search_term, (list, tuple)) lrs_min, lrs_max = self.search_term[:2] if self.search_type == "LOD": lrs_min = lrs_min * 4.61 lrs_max = lrs_max * 4.61 where_clause = """ %sXRef.LRS > %s and %sXRef.LRS < %s """ % self.mescape(self.dataset.type, min(lrs_min, lrs_max), self.dataset.type, max(lrs_min, lrs_max)) if len(self.search_term) > 2: try: chr_num = int(float(self.search_term[2])) except: chr_num = self.search_term[2].lower().replace('chr', '') self.search_term[2] = chr_num where_clause += """ and Geno.Chr = '%s' """ % (chr_num) if len(self.search_term) == 5: mb_low, mb_high = self.search_term[3:] where_clause += """ and Geno.Mb > %s and Geno.Mb < %s """ % self.mescape(min(mb_low, mb_high), max(mb_low, mb_high)) where_clause += """ and %sXRef.Locus = Geno.name and Geno.SpeciesId = %s """ % self.mescape(self.dataset.type, self.species_id) else: # Deal with >, <, >=, and <= lrs_val = self.search_term[0] if self.search_type == "LOD": lrs_val = lrs_val * 4.61 where_clause = """ %sXRef.LRS %s %s """ % self.mescape(self.dataset.type, self.search_operator, self.search_term[0]) return where_clause def run(self): self.from_clause = self.get_from_clause() self.where_clause = self.get_where_clause() self.query = self.compile_final_query( self.from_clause, self.where_clause) return self.execute(self.query) class MrnaLrsSearch(LrsSearch, MrnaAssaySearch): for search_key in ('LRS', 'LOD'): DoSearch.search_types['ProbeSet_' + search_key] = "MrnaLrsSearch" def run(self): self.from_clause = self.get_from_clause() self.where_clause = self.get_where_clause() self.query = self.compile_final_query( from_clause=self.from_clause, where_clause=self.where_clause) return self.execute(self.query) class PhenotypeLrsSearch(LrsSearch, PhenotypeSearch): for search_key in ('LRS', 'LOD'): DoSearch.search_types['Publish_' + search_key] = "PhenotypeLrsSearch" def run(self): self.from_clause = self.get_from_clause() self.where_clause = self.get_where_clause() self.query = self.compile_final_query( from_clause=self.from_clause, where_clause=self.where_clause) return self.execute(self.query) class CisTransLrsSearch(DoSearch): def get_where_clause(self, cis_trans): self.mb_buffer = 5 # default chromosome = None if cis_trans == "cis": the_operator = "<" else: the_operator = ">" if self.search_operator == "=": if len(self.search_term) == 2 or len(self.search_term) == 3: self.search_term = [float(value) for value in self.search_term] if len(self.search_term) == 2: lrs_min, lrs_max = self.search_term #[int(value) for value in self.search_term] elif len(self.search_term) == 3: lrs_min, lrs_max, self.mb_buffer = self.search_term elif len(self.search_term) == 4: lrs_min, lrs_max, self.mb_buffer = [ float(value) for value in self.search_term[:3]] chromosome = self.search_term[3] chr_str = re.match("(^c|^C)[a-z]*", chromosome) if chr_str: chromosome = int(chromosome.replace(chr_str.group(0), '')) else: SomeError if self.search_type == "CISLOD" or self.search_type == "TRANSLOD": lrs_min = lrs_min * 4.61 lrs_max = lrs_max * 4.61 sub_clause = """ %sXRef.LRS > %s and %sXRef.LRS < %s and """ % ( self.sescape(self.dataset.type), self.sescape(str(min(lrs_min, lrs_max))), self.sescape(self.dataset.type), self.sescape(str(max(lrs_min, lrs_max))) ) else: # Deal with >, <, >=, and <= sub_clause = """ %sXRef.LRS %s %s and """ % ( self.sescape(self.dataset.type), self.sescape(self.search_operator), self.sescape(self.search_term[0]) ) if cis_trans == "cis": where_clause = sub_clause + """ ABS(%s.Mb-Geno.Mb) %s %s and %sXRef.Locus = Geno.name and Geno.SpeciesId = %s and %s.Chr = Geno.Chr""" % ( self.sescape(self.dataset.type), the_operator, self.sescape(str(self.mb_buffer)), self.sescape(self.dataset.type), self.sescape(str(self.species_id)), self.sescape(self.dataset.type) ) else: if chromosome: location_clause = """ (%s.Chr = '%s' and %s.Chr = Geno.Chr and ABS(%s.Mb-Geno.Mb) %s %s) or (%s.Chr != Geno.Chr and Geno.Chr = '%s')""" % ( self.sescape(self.dataset.type), chromosome, self.sescape( self.dataset.type), self.sescape( self.dataset.type), the_operator, self.sescape( str(self.mb_buffer)), self.sescape( self.dataset.type), chromosome) else: location_clause = "(ABS(%s.Mb-Geno.Mb) %s %s and %s.Chr = Geno.Chr) or (%s.Chr != Geno.Chr)" % (self.sescape( self.dataset.type), the_operator, self.sescape(str(self.mb_buffer)), self.sescape(self.dataset.type), self.sescape(self.dataset.type)) where_clause = sub_clause + """ %sXRef.Locus = Geno.name and Geno.SpeciesId = %s and (%s)""" % ( self.sescape(self.dataset.type), self.sescape(str(self.species_id)), location_clause ) return where_clause class CisLrsSearch(CisTransLrsSearch, MrnaAssaySearch): """ Searches for genes on a particular chromosome with a cis-eQTL within the given LRS values A cisLRS search can take 3 forms: - cisLRS=(min_LRS max_LRS) - cisLRS=(min_LRS max_LRS mb_buffer) - cisLRS>min_LRS where min/max_LRS represent the range of LRS scores and the mb_buffer is the range around a particular QTL where its eQTL would be considered "cis". If there is no third parameter, mb_buffer will default to 5 megabases. A QTL is a cis-eQTL if a gene's expression is regulated by a QTL in roughly the same area (where the area is determined by the mb_buffer that the user can choose). """ for search_key in ('LRS', 'LOD'): DoSearch.search_types['ProbeSet_CIS' + search_key] = "CisLrsSearch" def get_where_clause(self): return CisTransLrsSearch.get_where_clause(self, "cis") def run(self): self.from_clause = self.get_from_clause() self.where_clause = self.get_where_clause() self.query = self.compile_final_query( self.from_clause, self.where_clause) return self.execute(self.query) class TransLrsSearch(CisTransLrsSearch, MrnaAssaySearch): """Searches for genes on a particular chromosome with a cis-eQTL within the given LRS values A transLRS search can take 3 forms: - transLRS=(min_LRS max_LRS) - transLRS=(min_LRS max_LRS mb_buffer) - transLRS>min_LRS where min/max_LRS represent the range of LRS scores and the mb_buffer is the range around a particular QTL where its eQTL would be considered "cis". If there is no third parameter, mb_buffer will default to 5 megabases. A QTL is a trans-eQTL if a gene's expression is regulated by a QTL in a different location/area (where the area is determined by the mb_buffer that the user can choose). Opposite of cis-eQTL. """ for search_key in ('LRS', 'LOD'): DoSearch.search_types['ProbeSet_TRANS' + search_key] = "TransLrsSearch" def get_where_clause(self): return CisTransLrsSearch.get_where_clause(self, "trans") def run(self): self.from_clause = self.get_from_clause() self.where_clause = self.get_where_clause() self.query = self.compile_final_query( self.from_clause, self.where_clause) return self.execute(self.query) class MeanSearch(MrnaAssaySearch): """Searches for genes expressed within an interval (log2 units) determined by the user""" DoSearch.search_types['ProbeSet_MEAN'] = "MeanSearch" def get_where_clause(self): self.search_term = [float(value) for value in self.search_term] if self.search_operator == "=": assert isinstance(self.search_term, (list, tuple)) self.mean_min, self.mean_max = self.search_term[:2] where_clause = """ %sXRef.mean > %s and %sXRef.mean < %s """ % self.mescape(self.dataset.type, min(self.mean_min, self.mean_max), self.dataset.type, max(self.mean_min, self.mean_max)) else: # Deal with >, <, >=, and <= where_clause = """ %sXRef.mean %s %s """ % self.mescape(self.dataset.type, self.search_operator, self.search_term[0]) return where_clause def run(self): self.where_clause = self.get_where_clause() self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) class RangeSearch(MrnaAssaySearch): """Searches for genes with a range of expression varying between two values""" DoSearch.search_types['ProbeSet_RANGE'] = "RangeSearch" def get_where_clause(self): if self.search_operator == "=": assert isinstance(self.search_term, (list, tuple)) self.range_min, self.range_max = self.search_term[:2] where_clause = """ (SELECT Pow(2, max(value) -min(value)) FROM ProbeSetData WHERE ProbeSetData.Id = ProbeSetXRef.dataId) > %s AND (SELECT Pow(2, max(value) -min(value)) FROM ProbeSetData WHERE ProbeSetData.Id = ProbeSetXRef.dataId) < %s """ % self.mescape(min(self.range_min, self.range_max), max(self.range_min, self.range_max)) else: # Deal with >, <, >=, and <= where_clause = """ (SELECT Pow(2, max(value) -min(value)) FROM ProbeSetData WHERE ProbeSetData.Id = ProbeSetXRef.dataId) > %s """ % (self.sescape(self.search_term[0])) return where_clause def run(self): self.where_clause = self.get_where_clause() self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) class PositionSearch(DoSearch): """Searches for genes/markers located within a specified range on a specified chromosome""" for search_key in ('POSITION', 'POS', 'MB'): DoSearch.search_types[search_key] = "PositionSearch" def get_where_clause(self): self.search_term = [float(value) if is_number( value) else value for value in self.search_term] chr, self.mb_min, self.mb_max = self.search_term[:3] self.chr = str(chr).lower() self.get_chr() where_clause = """ %s.Chr = '%s' and %s.Mb > %s and %s.Mb < %s """ % self.mescape(self.dataset.type, self.chr, self.dataset.type, min(self.mb_min, self.mb_max), self.dataset.type, max(self.mb_min, self.mb_max)) return where_clause def get_chr(self): try: self.chr = int(float(self.chr)) except: self.chr = self.chr.lower().replace('chr', '') def run(self): self.get_where_clause() self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) class MrnaPositionSearch(PositionSearch, MrnaAssaySearch): """Searches for genes located within a specified range on a specified chromosome""" for search_key in ('POSITION', 'POS', 'MB'): DoSearch.search_types['ProbeSet_' + search_key] = "MrnaPositionSearch" def run(self): self.where_clause = self.get_where_clause() self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) class GenotypePositionSearch(PositionSearch, GenotypeSearch): """Searches for genes located within a specified range on a specified chromosome""" for search_key in ('POSITION', 'POS', 'MB'): DoSearch.search_types['Geno_' + search_key] = "GenotypePositionSearch" def run(self): self.where_clause = self.get_where_clause() self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) class PvalueSearch(MrnaAssaySearch): """Searches for traits with a permutationed p-value between low and high""" DoSearch.search_types['ProbeSet_PVALUE'] = "PvalueSearch" def run(self): self.search_term = [float(value) for value in self.search_term] if self.search_operator == "=": assert isinstance(self.search_term, (list, tuple)) self.pvalue_min, self.pvalue_max = self.search_term[:2] self.where_clause = """ %sXRef.pValue > %s and %sXRef.pValue < %s """ % self.mescape( self.dataset.type, min(self.pvalue_min, self.pvalue_max), self.dataset.type, max(self.pvalue_min, self.pvalue_max)) else: # Deal with >, <, >=, and <= self.where_clause = """ %sXRef.pValue %s %s """ % self.mescape( self.dataset.type, self.search_operator, self.search_term[0]) self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) class AuthorSearch(PhenotypeSearch): """Searches for phenotype traits with specified author(s)""" DoSearch.search_types["Publish_NAME"] = "AuthorSearch" def run(self): search_term = "%" + self.search_term[0] + "%" self.where_clause = """ Publication.Authors LIKE "%s" and """ % (search_term) self.query = self.compile_final_query(where_clause=self.where_clause) return self.execute(self.query) def is_number(s): try: float(s) return True except ValueError: return False if __name__ == "__main__": # Usually this will be used as a library, but call it from the command line for testing # And it runs the code below import sys from gn2.base import webqtlConfig from gn2.base.data_set import create_dataset from gn2.utility import webqtlUtil from gn2.db import webqtlDatabaseFunction from gn2.wqflask.database import database_connection with database_connection(get_setting("SQL_URI")) as db_conn: with db_conn.cursor() as cursor: dataset_name = "HC_M2_0606_P" dataset = create_dataset(db_conn, dataset_name) results = PvalueSearch(['0.005'], '<', dataset, cursor, db_conn).run()