diff options
Diffstat (limited to 'wqflask/wqflask/do_search.py')
-rw-r--r-- | wqflask/wqflask/do_search.py | 965 |
1 files changed, 0 insertions, 965 deletions
diff --git a/wqflask/wqflask/do_search.py b/wqflask/wqflask/do_search.py deleted file mode 100644 index b6f540fa..00000000 --- a/wqflask/wqflask/do_search.py +++ /dev/null @@ -1,965 +0,0 @@ -import json -import re -import requests -import string - -from wqflask.database import database_connection - -import sys - -from db import webqtlDatabaseFunction -from utility.tools import get_setting, GN2_BASE_URL - - -class DoSearch: - """Parent class containing parameters/functions used for all searches""" - - # Used to translate search phrases into classes - search_types = dict() - - def __init__(self, search_term, search_operator=None, dataset=None, search_type=None): - self.search_term = search_term - # Make sure search_operator is something we expect - assert search_operator in ( - None, "=", "<", ">", "<=", ">="), "Bad search operator" - self.search_operator = search_operator - self.dataset = dataset - self.search_type = search_type - - if self.dataset: - # Get group information for dataset and the species id - self.species_id = webqtlDatabaseFunction.retrieve_species_id( - self.dataset.group.name) - - def execute(self, query): - """Executes query and returns results""" - query = self.normalize_spaces(query) - with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: - cursor.execute(query) - return cursor.fetchall() - - def handle_wildcard(self, str): - keyword = str.strip() - keyword = keyword.replace("*", ".*") - keyword = keyword.replace("?", ".") - - return keyword - - def sescape(self, item): - """Single escape""" - from utility.tools import get_setting - with database_connection(get_setting("SQL_URI")) as conn: - escaped = conn.escape_string(str(item)).decode() - return escaped - - def mescape(self, *items): - """Multiple escape""" - from utility.tools import get_setting - escaped = [] - with database_connection(get_setting("SQL_URI")) as conn: - escaped = [conn.escape_string(str(item)).decode() for item in items] - return tuple(escaped) - - def normalize_spaces(self, stringy): - """Strips out newlines/extra spaces and replaces them with just spaces""" - step_one = " ".join(stringy.split()) - return step_one - - @classmethod - def get_search(cls, search_type): - search_type_string = search_type['dataset_type'] - if 'key' in search_type and search_type['key'] != None: - search_type_string += '_' + search_type['key'] - - if search_type_string in cls.search_types: - return cls.search_types[search_type_string] - else: - return None - - -class MrnaAssaySearch(DoSearch): - """A search within an expression dataset, including mRNA, protein, SNP, but not phenotype or metabolites""" - - DoSearch.search_types['ProbeSet'] = "MrnaAssaySearch" - - base_query = """ - SELECT DISTINCT - ProbeSetFreeze.`Name`, - ProbeSetFreeze.`FullName`, - ProbeSet.`Name`, - ProbeSet.`Symbol`, - CAST(ProbeSet.`description` AS BINARY), - CAST(ProbeSet.`Probe_Target_Description` AS BINARY), - ProbeSet.`Chr`, - ProbeSet.`Mb`, - ProbeSetXRef.`Mean`, - ProbeSetXRef.`LRS`, - ProbeSetXRef.`Locus`, - ProbeSetXRef.`pValue`, - ProbeSetXRef.`additive`, - Geno.`Chr` as geno_chr, - Geno.`Mb` as geno_mb - FROM Species - INNER JOIN InbredSet ON InbredSet.`SpeciesId`= Species.`Id` - INNER JOIN ProbeFreeze ON ProbeFreeze.`InbredSetId` = InbredSet.`Id` - INNER JOIN Tissue ON ProbeFreeze.`TissueId` = Tissue.`Id` - INNER JOIN ProbeSetFreeze ON ProbeSetFreeze.`ProbeFreezeId` = ProbeFreeze.`Id` - INNER JOIN ProbeSetXRef ON ProbeSetXRef.`ProbeSetFreezeId` = ProbeSetFreeze.`Id` - INNER JOIN ProbeSet ON ProbeSet.`Id` = ProbeSetXRef.`ProbeSetId` - LEFT JOIN Geno ON ProbeSetXRef.`Locus` = Geno.`Name` AND Geno.`SpeciesId` = Species.`Id` """ - - header_fields = ['Index', - 'Record', - 'Symbol', - 'Description', - 'Location', - 'Mean', - 'Max LRS', - 'Max LRS Location', - 'Additive Effect'] - - def get_alias_where_clause(self): - search_string = self.sescape(self.search_term[0]) - - if self.search_term[0] != "*": - match_clause = """((MATCH (ProbeSet.symbol) AGAINST ('%s' IN BOOLEAN MODE))) and """ % ( - search_string) - else: - match_clause = "" - - where_clause = (match_clause - + """ProbeSet.Id = ProbeSetXRef.ProbeSetId - and ProbeSetXRef.ProbeSetFreezeId = %s - """ % (self.sescape(str(self.dataset.id)))) - - return where_clause - - def get_where_clause(self): - search_string = self.sescape(self.search_term[0]) - - if self.search_term[0] != "*": - if re.search("\w{1,2}\-\w+|\w+\-\w{1,2}", self.search_term[0]): - search_string = f'"{search_string}*"' - - match_clause = f"""((MATCH (ProbeSet.Name, - ProbeSet.description, - ProbeSet.symbol, - alias, - GenbankId, - UniGeneId, - Probe_Target_Description) - AGAINST ('{search_string}' IN BOOLEAN MODE))) AND """ - else: - match_clause = "" - - where_clause = (match_clause - + """ProbeSet.Id = ProbeSetXRef.ProbeSetId - and ProbeSetXRef.ProbeSetFreezeId = %s - """ % (self.sescape(str(self.dataset.id)))) - - return where_clause - - def compile_final_query(self, from_clause='', where_clause=''): - """Generates the final query string""" - - from_clause = self.normalize_spaces(from_clause) - - query = (self.base_query + - """%s - WHERE %s - and ProbeSet.Id = ProbeSetXRef.ProbeSetId - and ProbeSetXRef.ProbeSetFreezeId = %s - ORDER BY ProbeSet.symbol ASC - """ % (self.sescape(from_clause), - where_clause, - self.sescape(str(self.dataset.id)))) - return query - - def run_combined(self, from_clause='', where_clause=''): - """Generates and runs a combined search of an mRNA expression dataset""" - #query = self.base_query + from_clause + " WHERE " + where_clause - - from_clause = self.normalize_spaces(from_clause) - - query = (self.base_query + - """%s - WHERE %s - and ProbeSet.Id = ProbeSetXRef.ProbeSetId - and ProbeSetXRef.ProbeSetFreezeId = %s - ORDER BY ProbeSet.symbol ASC - """ % (self.sescape(from_clause), - where_clause, - self.sescape(str(self.dataset.id)))) - - return self.execute(query) - - def run(self): - """Generates and runs a simple search of an mRNA expression dataset""" - where_clause = self.get_where_clause() - query = self.base_query + "WHERE " + where_clause + "ORDER BY ProbeSet.symbol ASC" - return self.execute(query) - - -class PhenotypeSearch(DoSearch): - """A search within a phenotype dataset""" - - DoSearch.search_types['Publish'] = "PhenotypeSearch" - - base_query = """SELECT PublishXRef.Id, - CAST(Phenotype.`Pre_publication_description` AS BINARY), - CAST(Phenotype.`Post_publication_description` AS BINARY), - Publication.`Authors`, - Publication.`Year`, - Publication.`PubMed_ID`, - PublishXRef.`mean`, - PublishXRef.`LRS`, - PublishXRef.`additive`, - PublishXRef.`Locus`, - InbredSet.`InbredSetCode`, - Geno.`Chr`, - Geno.`Mb` - FROM Species - INNER JOIN InbredSet ON InbredSet.`SpeciesId` = Species.`Id` - INNER JOIN PublishXRef ON PublishXRef.`InbredSetId` = InbredSet.`Id` - INNER JOIN PublishFreeze ON PublishFreeze.`InbredSetId` = InbredSet.`Id` - INNER JOIN Publication ON Publication.`Id` = PublishXRef.`PublicationId` - INNER JOIN Phenotype ON Phenotype.`Id` = PublishXRef.`PhenotypeId` - LEFT JOIN Geno ON PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id """ - - search_fields = ('Phenotype.Post_publication_description', - 'Phenotype.Pre_publication_description', - 'Phenotype.Pre_publication_abbreviation', - 'Phenotype.Post_publication_abbreviation', - 'Phenotype.Lab_code', - 'Publication.PubMed_ID', - 'Publication.Abstract', - 'Publication.Title', - 'Publication.Authors', - 'PublishXRef.Id') - - header_fields = ['Index', - 'Record', - 'Description', - 'Mean', - 'Authors', - 'Year', - 'Max LRS', - 'Max LRS Location', - 'Additive Effect'] - - def get_where_clause(self): - """Generate clause for WHERE portion of query""" - - # Todo: Zach will figure out exactly what both these lines mean - # and comment here - - # if "'" not in self.search_term[0]: - search_term = "%" + \ - self.handle_wildcard(self.search_term[0]) + "%" - if "_" in self.search_term[0]: - if len(self.search_term[0].split("_")[0]) == 3: - search_term = "%" + self.handle_wildcard( - self.search_term[0].split("_")[1]) + "%" - - # This adds a clause to the query that matches the search term - # against each field in the search_fields tuple - where_clause_list = [] - for field in self.search_fields: - where_clause_list.append('''%s LIKE "%s"''' % - (field, search_term)) - where_clause = "(%s) " % ' OR '.join(where_clause_list) - - return where_clause - - def compile_final_query(self, from_clause='', where_clause=''): - """Generates the final query string""" - - from_clause = self.normalize_spaces(from_clause) - - if self.search_term[0] == "*": - query = (self.base_query + - """%s - WHERE PublishXRef.InbredSetId = %s - and PublishXRef.PhenotypeId = Phenotype.Id - and PublishXRef.PublicationId = Publication.Id - and PublishFreeze.Id = %s - ORDER BY PublishXRef.Id""" % ( - from_clause, - self.sescape(str(self.dataset.group.id)), - self.sescape(str(self.dataset.id)))) - else: - query = (self.base_query + - """%s - WHERE %s - and PublishXRef.InbredSetId = %s - and PublishXRef.PhenotypeId = Phenotype.Id - and PublishXRef.PublicationId = Publication.Id - and PublishFreeze.Id = %s - ORDER BY PublishXRef.Id""" % ( - from_clause, - where_clause, - self.sescape(str(self.dataset.group.id)), - self.sescape(str(self.dataset.id)))) - - return query - - def run_combined(self, from_clause, where_clause): - """Generates and runs a combined search of an phenotype dataset""" - from_clause = self.normalize_spaces(from_clause) - - query = (self.base_query + - """%s - WHERE %s - PublishXRef.InbredSetId = %s and - PublishXRef.PhenotypeId = Phenotype.Id and - PublishXRef.PublicationId = Publication.Id and - PublishFreeze.Id = %s""" % ( - from_clause, - where_clause, - self.sescape(str(self.dataset.group.id)), - self.sescape(str(self.dataset.id)))) - - return self.execute(query) - - def run(self): - """Generates and runs a simple search of a phenotype dataset""" - - query = self.compile_final_query(where_clause=self.get_where_clause()) - - return self.execute(query) - - -class GenotypeSearch(DoSearch): - """A search within a genotype dataset""" - - DoSearch.search_types['Geno'] = "GenotypeSearch" - - base_query = """SELECT Geno.Name, - GenoFreeze.createtime as thistable, - Geno.Name as Geno_Name, - Geno.Source2 as Geno_Source2, - Geno.Chr as Geno_Chr, - Geno.Mb as Geno_Mb - FROM GenoXRef, GenoFreeze, Geno """ - - search_fields = ('Name', 'Chr') - - header_fields = ['Index', - 'Record', - 'Location'] - - def get_where_clause(self): - """Generate clause for part of the WHERE portion of query""" - - # This adds a clause to the query that matches the search term - # against each field in search_fields (above) - where_clause = [] - - if "'" not in self.search_term[0]: - self.search_term = "%" + self.search_term[0] + "%" - - for field in self.search_fields: - where_clause.append('''%s LIKE "%s"''' % ("%s.%s" % self.mescape(self.dataset.type, - field), - self.search_term)) - where_clause = "(%s) " % ' OR '.join(where_clause) - - return where_clause - - def compile_final_query(self, from_clause='', where_clause=''): - """Generates the final query string""" - - from_clause = self.normalize_spaces(from_clause) - - if self.search_term[0] == "*": - query = (self.base_query - + """WHERE Geno.Id = GenoXRef.GenoId - and GenoXRef.GenoFreezeId = GenoFreeze.Id - and GenoFreeze.Id = %s""" % (self.sescape(str(self.dataset.id)))) - else: - query = (self.base_query + - """WHERE %s - and Geno.Id = GenoXRef.GenoId - and GenoXRef.GenoFreezeId = GenoFreeze.Id - and GenoFreeze.Id = %s""" % (where_clause, - self.sescape(str(self.dataset.id)))) - - return query - - def run(self): - """Generates and runs a simple search of a genotype dataset""" - # Todo: Zach will figure out exactly what both these lines mean - # and comment here - - if self.search_term[0] == "*": - self.query = self.compile_final_query() - else: - self.query = self.compile_final_query( - where_clause=self.get_where_clause()) - - return self.execute(self.query) - - -class RifSearch(MrnaAssaySearch): - """Searches for traits with a Gene RIF entry including the search term.""" - - DoSearch.search_types['ProbeSet_RIF'] = "RifSearch" - - def get_from_clause(self): - return f" INNER JOIN GeneRIF_BASIC ON GeneRIF_BASIC.`symbol` = { self.dataset.type }.`symbol` " - - def get_where_clause(self): - where_clause = f"(MATCH (GeneRIF_BASIC.comment) AGAINST ('+{ self.search_term[0] }' IN BOOLEAN MODE)) " - - return where_clause - - def run(self): - from_clause = self.get_from_clause() - where_clause = self.get_where_clause() - - query = self.compile_final_query(from_clause, where_clause) - - return self.execute(query) - - -class WikiSearch(MrnaAssaySearch): - """Searches GeneWiki for traits other people have annotated""" - - DoSearch.search_types['ProbeSet_WIKI'] = "WikiSearch" - - def get_from_clause(self): - return ", GeneRIF " - - def get_where_clause(self): - where_clause = """%s.symbol = GeneRIF.symbol - and GeneRIF.versionId=0 and GeneRIF.display>0 - and (GeneRIF.comment LIKE '%s' or GeneRIF.initial = '%s') - """ % (self.dataset.type, - "%" + str(self.search_term[0]) + "%", - str(self.search_term[0])) - return where_clause - - def run(self): - from_clause = self.get_from_clause() - where_clause = self.get_where_clause() - - query = self.compile_final_query(from_clause, where_clause) - - return self.execute(query) - - -class GoSearch(MrnaAssaySearch): - """Searches for synapse-associated genes listed in the Gene Ontology.""" - - DoSearch.search_types['ProbeSet_GO'] = "GoSearch" - - def get_from_clause(self): - from_clause = """, db_GeneOntology.term as GOterm, - db_GeneOntology.association as GOassociation, - db_GeneOntology.gene_product as GOgene_product """ - - return from_clause - - def get_where_clause(self): - field = 'GOterm.acc' - go_id = 'GO:' + ('0000000' + self.search_term[0])[-7:] - - statements = ("""%s.symbol=GOgene_product.symbol and - GOassociation.gene_product_id=GOgene_product.id and - GOterm.id=GOassociation.term_id""" % ( - self.sescape(self.dataset.type))) - - where_clause = " %s = '%s' and %s " % (field, go_id, statements) - - return where_clause - - def run(self): - from_clause = self.get_from_clause() - where_clause = self.get_where_clause() - - query = self.compile_final_query(from_clause, where_clause) - - return self.execute(query) - -# ZS: Not sure what the best way to deal with LRS searches is - - -class LrsSearch(DoSearch): - """Searches for genes with a QTL within the given LRS values - - LRS searches can take 3 different forms: - - LRS > (or <) min/max_LRS - - LRS=(min_LRS max_LRS) - - LRS=(min_LRS max_LRS chromosome start_Mb end_Mb) - where min/max_LRS represent the range of LRS scores and start/end_Mb represent - the range in megabases on the given chromosome - - """ - - for search_key in ('LRS', 'LOD'): - DoSearch.search_types[search_key] = "LrsSearch" - - def get_from_clause(self): - converted_search_term = [] - for value in self.search_term: - try: - converted_search_term.append(float(value)) - except: - converted_search_term.append(value) - - self.search_term = converted_search_term - - from_clause = "" - - return from_clause - - def get_where_clause(self): - if self.search_operator == "=": - assert isinstance(self.search_term, (list, tuple)) - lrs_min, lrs_max = self.search_term[:2] - if self.search_type == "LOD": - lrs_min = lrs_min * 4.61 - lrs_max = lrs_max * 4.61 - - where_clause = """ %sXRef.LRS > %s and - %sXRef.LRS < %s """ % self.mescape(self.dataset.type, - min(lrs_min, - lrs_max), - self.dataset.type, - max(lrs_min, lrs_max)) - - if len(self.search_term) > 2: - try: - chr_num = int(float(self.search_term[2])) - except: - chr_num = self.search_term[2].lower().replace('chr', '') - self.search_term[2] = chr_num - - where_clause += """ and Geno.Chr = '%s' """ % (chr_num) - if len(self.search_term) == 5: - mb_low, mb_high = self.search_term[3:] - where_clause += """ and Geno.Mb > %s and - Geno.Mb < %s - """ % self.mescape(min(mb_low, mb_high), - max(mb_low, mb_high)) - - where_clause += """ and %sXRef.Locus = Geno.name and - Geno.SpeciesId = %s - """ % self.mescape(self.dataset.type, - self.species_id) - else: - # Deal with >, <, >=, and <= - lrs_val = self.search_term[0] - if self.search_type == "LOD": - lrs_val = lrs_val * 4.61 - - where_clause = """ %sXRef.LRS %s %s """ % self.mescape(self.dataset.type, - self.search_operator, - self.search_term[0]) - - return where_clause - - def run(self): - - self.from_clause = self.get_from_clause() - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query( - self.from_clause, self.where_clause) - - return self.execute(self.query) - - -class MrnaLrsSearch(LrsSearch, MrnaAssaySearch): - - for search_key in ('LRS', 'LOD'): - DoSearch.search_types['ProbeSet_' + search_key] = "MrnaLrsSearch" - - def run(self): - self.from_clause = self.get_from_clause() - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query( - from_clause=self.from_clause, where_clause=self.where_clause) - - return self.execute(self.query) - - -class PhenotypeLrsSearch(LrsSearch, PhenotypeSearch): - - for search_key in ('LRS', 'LOD'): - DoSearch.search_types['Publish_' + search_key] = "PhenotypeLrsSearch" - - def run(self): - - self.from_clause = self.get_from_clause() - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query( - from_clause=self.from_clause, where_clause=self.where_clause) - - return self.execute(self.query) - - -class CisTransLrsSearch(DoSearch): - - def get_where_clause(self, cis_trans): - self.mb_buffer = 5 # default - chromosome = None - if cis_trans == "cis": - the_operator = "<" - else: - the_operator = ">" - - if self.search_operator == "=": - if len(self.search_term) == 2 or len(self.search_term) == 3: - self.search_term = [float(value) for value in self.search_term] - if len(self.search_term) == 2: - lrs_min, lrs_max = self.search_term - #[int(value) for value in self.search_term] - elif len(self.search_term) == 3: - lrs_min, lrs_max, self.mb_buffer = self.search_term - elif len(self.search_term) == 4: - lrs_min, lrs_max, self.mb_buffer = [ - float(value) for value in self.search_term[:3]] - chromosome = self.search_term[3] - chr_str = re.match("(^c|^C)[a-z]*", chromosome) - if chr_str: - chromosome = int(chromosome.replace(chr_str.group(0), '')) - else: - SomeError - - if self.search_type == "CISLOD" or self.search_type == "TRANSLOD": - lrs_min = lrs_min * 4.61 - lrs_max = lrs_max * 4.61 - - sub_clause = """ %sXRef.LRS > %s and - %sXRef.LRS < %s and """ % ( - self.sescape(self.dataset.type), - self.sescape(str(min(lrs_min, lrs_max))), - self.sescape(self.dataset.type), - self.sescape(str(max(lrs_min, lrs_max))) - ) - else: - # Deal with >, <, >=, and <= - sub_clause = """ %sXRef.LRS %s %s and """ % ( - self.sescape(self.dataset.type), - self.sescape(self.search_operator), - self.sescape(self.search_term[0]) - ) - - if cis_trans == "cis": - where_clause = sub_clause + """ - ABS(%s.Mb-Geno.Mb) %s %s and - %sXRef.Locus = Geno.name and - Geno.SpeciesId = %s and - %s.Chr = Geno.Chr""" % ( - self.sescape(self.dataset.type), - the_operator, - self.sescape(str(self.mb_buffer)), - self.sescape(self.dataset.type), - self.sescape(str(self.species_id)), - self.sescape(self.dataset.type) - ) - else: - if chromosome: - location_clause = """ - (%s.Chr = '%s' and %s.Chr = Geno.Chr and ABS(%s.Mb-Geno.Mb) %s %s) - or (%s.Chr != Geno.Chr and Geno.Chr = '%s')""" % ( - self.sescape(self.dataset.type), - chromosome, - self.sescape( - self.dataset.type), - self.sescape( - self.dataset.type), - the_operator, - self.sescape( - str(self.mb_buffer)), - self.sescape( - self.dataset.type), - chromosome) - else: - location_clause = "(ABS(%s.Mb-Geno.Mb) %s %s and %s.Chr = Geno.Chr) or (%s.Chr != Geno.Chr)" % (self.sescape( - self.dataset.type), the_operator, self.sescape(str(self.mb_buffer)), self.sescape(self.dataset.type), self.sescape(self.dataset.type)) - where_clause = sub_clause + """ - %sXRef.Locus = Geno.name and - Geno.SpeciesId = %s and - (%s)""" % ( - self.sescape(self.dataset.type), - self.sescape(str(self.species_id)), - location_clause - ) - - return where_clause - - -class CisLrsSearch(CisTransLrsSearch, MrnaAssaySearch): - """ - Searches for genes on a particular chromosome with a cis-eQTL within the given LRS values - - A cisLRS search can take 3 forms: - - cisLRS=(min_LRS max_LRS) - - cisLRS=(min_LRS max_LRS mb_buffer) - - cisLRS>min_LRS - where min/max_LRS represent the range of LRS scores and the mb_buffer is the range around - a particular QTL where its eQTL would be considered "cis". If there is no third parameter, - mb_buffer will default to 5 megabases. - - A QTL is a cis-eQTL if a gene's expression is regulated by a QTL in roughly the same area - (where the area is determined by the mb_buffer that the user can choose). - - """ - - for search_key in ('LRS', 'LOD'): - DoSearch.search_types['ProbeSet_CIS' + search_key] = "CisLrsSearch" - - def get_where_clause(self): - return CisTransLrsSearch.get_where_clause(self, "cis") - - def run(self): - self.from_clause = self.get_from_clause() - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query( - self.from_clause, self.where_clause) - - return self.execute(self.query) - - -class TransLrsSearch(CisTransLrsSearch, MrnaAssaySearch): - """Searches for genes on a particular chromosome with a cis-eQTL within the given LRS values - - A transLRS search can take 3 forms: - - transLRS=(min_LRS max_LRS) - - transLRS=(min_LRS max_LRS mb_buffer) - - transLRS>min_LRS - where min/max_LRS represent the range of LRS scores and the mb_buffer is the range around - a particular QTL where its eQTL would be considered "cis". If there is no third parameter, - mb_buffer will default to 5 megabases. - - A QTL is a trans-eQTL if a gene's expression is regulated by a QTL in a different location/area - (where the area is determined by the mb_buffer that the user can choose). Opposite of cis-eQTL. - - """ - - for search_key in ('LRS', 'LOD'): - DoSearch.search_types['ProbeSet_TRANS' + search_key] = "TransLrsSearch" - - def get_where_clause(self): - return CisTransLrsSearch.get_where_clause(self, "trans") - - def run(self): - self.from_clause = self.get_from_clause() - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query( - self.from_clause, self.where_clause) - - return self.execute(self.query) - - -class MeanSearch(MrnaAssaySearch): - """Searches for genes expressed within an interval (log2 units) determined by the user""" - - DoSearch.search_types['ProbeSet_MEAN'] = "MeanSearch" - - def get_where_clause(self): - self.search_term = [float(value) for value in self.search_term] - - if self.search_operator == "=": - assert isinstance(self.search_term, (list, tuple)) - self.mean_min, self.mean_max = self.search_term[:2] - - where_clause = """ %sXRef.mean > %s and - %sXRef.mean < %s """ % self.mescape(self.dataset.type, - min(self.mean_min, - self.mean_max), - self.dataset.type, - max(self.mean_min, self.mean_max)) - else: - # Deal with >, <, >=, and <= - where_clause = """ %sXRef.mean %s %s """ % self.mescape(self.dataset.type, - self.search_operator, - self.search_term[0]) - - return where_clause - - def run(self): - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query(where_clause=self.where_clause) - - return self.execute(self.query) - - -class RangeSearch(MrnaAssaySearch): - """Searches for genes with a range of expression varying between two values""" - - DoSearch.search_types['ProbeSet_RANGE'] = "RangeSearch" - - def get_where_clause(self): - if self.search_operator == "=": - assert isinstance(self.search_term, (list, tuple)) - self.range_min, self.range_max = self.search_term[:2] - where_clause = """ (SELECT Pow(2, max(value) -min(value)) - FROM ProbeSetData - WHERE ProbeSetData.Id = ProbeSetXRef.dataId) > %s AND - (SELECT Pow(2, max(value) -min(value)) - FROM ProbeSetData - WHERE ProbeSetData.Id = ProbeSetXRef.dataId) < %s - """ % self.mescape(min(self.range_min, self.range_max), - max(self.range_min, self.range_max)) - else: - # Deal with >, <, >=, and <= - where_clause = """ (SELECT Pow(2, max(value) -min(value)) - FROM ProbeSetData - WHERE ProbeSetData.Id = ProbeSetXRef.dataId) > %s - """ % (self.sescape(self.search_term[0])) - return where_clause - - def run(self): - self.where_clause = self.get_where_clause() - - self.query = self.compile_final_query(where_clause=self.where_clause) - - return self.execute(self.query) - - -class PositionSearch(DoSearch): - """Searches for genes/markers located within a specified range on a specified chromosome""" - - for search_key in ('POSITION', 'POS', 'MB'): - DoSearch.search_types[search_key] = "PositionSearch" - - def get_where_clause(self): - self.search_term = [float(value) if is_number( - value) else value for value in self.search_term] - chr, self.mb_min, self.mb_max = self.search_term[:3] - self.chr = str(chr).lower() - self.get_chr() - - where_clause = """ %s.Chr = '%s' and - %s.Mb > %s and - %s.Mb < %s """ % self.mescape(self.dataset.type, - self.chr, - self.dataset.type, - min(self.mb_min, - self.mb_max), - self.dataset.type, - max(self.mb_min, self.mb_max)) - - return where_clause - - def get_chr(self): - try: - self.chr = int(float(self.chr)) - except: - self.chr = self.chr.lower().replace('chr', '') - - def run(self): - - self.get_where_clause() - self.query = self.compile_final_query(where_clause=self.where_clause) - - return self.execute(self.query) - - -class MrnaPositionSearch(PositionSearch, MrnaAssaySearch): - """Searches for genes located within a specified range on a specified chromosome""" - - for search_key in ('POSITION', 'POS', 'MB'): - DoSearch.search_types['ProbeSet_' + search_key] = "MrnaPositionSearch" - - def run(self): - - self.where_clause = self.get_where_clause() - self.query = self.compile_final_query(where_clause=self.where_clause) - - return self.execute(self.query) - - -class GenotypePositionSearch(PositionSearch, GenotypeSearch): - """Searches for genes located within a specified range on a specified chromosome""" - - for search_key in ('POSITION', 'POS', 'MB'): - DoSearch.search_types['Geno_' + search_key] = "GenotypePositionSearch" - - def run(self): - - self.where_clause = self.get_where_clause() - self.query = self.compile_final_query(where_clause=self.where_clause) - - return self.execute(self.query) - - -class PvalueSearch(MrnaAssaySearch): - """Searches for traits with a permutationed p-value between low and high""" - - DoSearch.search_types['ProbeSet_PVALUE'] = "PvalueSearch" - - def run(self): - - self.search_term = [float(value) for value in self.search_term] - - if self.search_operator == "=": - assert isinstance(self.search_term, (list, tuple)) - self.pvalue_min, self.pvalue_max = self.search_term[:2] - self.where_clause = """ %sXRef.pValue > %s and %sXRef.pValue < %s - """ % self.mescape( - self.dataset.type, - min(self.pvalue_min, self.pvalue_max), - self.dataset.type, - max(self.pvalue_min, self.pvalue_max)) - else: - # Deal with >, <, >=, and <= - self.where_clause = """ %sXRef.pValue %s %s - """ % self.mescape( - self.dataset.type, - self.search_operator, - self.search_term[0]) - - self.query = self.compile_final_query(where_clause=self.where_clause) - return self.execute(self.query) - - -class AuthorSearch(PhenotypeSearch): - """Searches for phenotype traits with specified author(s)""" - - DoSearch.search_types["Publish_NAME"] = "AuthorSearch" - - def run(self): - search_term = "%" + self.search_term[0] + "%" - self.where_clause = """ Publication.Authors LIKE "%s" and - """ % (search_term) - - self.query = self.compile_final_query(where_clause=self.where_clause) - - return self.execute(self.query) - - -def is_number(s): - try: - float(s) - return True - except ValueError: - return False - - -if __name__ == "__main__": - # Usually this will be used as a library, but call it from the command line for testing - # And it runs the code below - import sys - - from base import webqtlConfig - from base.data_set import create_dataset - from utility import webqtlUtil - from db import webqtlDatabaseFunction - - from wqflask.database import database_connection - - with database_connection(get_setting("SQL_URI")) as db_conn: - with db_conn.cursor() as cursor: - dataset_name = "HC_M2_0606_P" - dataset = create_dataset(db_conn, dataset_name) - - results = PvalueSearch(['0.005'], '<', dataset, cursor, db_conn).run() |