import uuid from math import * import requests import unicodedata from urllib.parse import urlencode, urljoin import re import json from pymonad.maybe import Just, Maybe from pymonad.tools import curry from flask import g from gn3.monads import MonadicDict from gn2.base.data_set import create_dataset from gn2.base.webqtlConfig import PUBMEDLINK_URL from gn2.wqflask import parser from gn2.wqflask import do_search from gn2.wqflask.database import database_connection from gn2.utility.authentication_tools import check_resource_availability from gn2.utility.hmac import hmac_creation from gn2.utility.tools import get_setting, GN2_BASE_URL, GN3_LOCAL_URL from gn2.utility.type_checking import is_str MAX_SEARCH_RESULTS = 50000 # Max number of search results, passed to Xapian search (this needs to match the value in GN3!) class SearchResultPage: #maxReturn = 3000 def __init__(self, kw): """ This class gets invoked after hitting submit on the main menu (in views.py). """ ########################################### # Names and IDs of group / F2 set ########################################### self.uc_id = uuid.uuid4() self.go_term = None self.search_type = "sql" # Assume it's an SQL search by default, since all searches will work with SQL if kw['search_terms_or']: self.and_or = "or" self.search_terms = kw['search_terms_or'] else: self.and_or = "and" self.search_terms = kw['search_terms_and'] search = self.search_terms self.original_search_string = self.search_terms # check for dodgy search terms rx = re.compile( r'.*\W(href|http|sql|select|update)\W.*', re.IGNORECASE) if rx.match(search): self.search_term_exists = False return else: self.search_term_exists = True self.results = [] max_result_count = 100000 # max number of results to display type = kw.get('type') if type == "Phenotypes": # split datatype on type field max_result_count = 50000 dataset_type = "Publish" elif type == "Genotypes": dataset_type = "Geno" else: dataset_type = "ProbeSet" # ProbeSet is default assert(is_str(kw.get('dataset'))) self.dataset = create_dataset(kw['dataset'], dataset_type) # I don't like using try/except, but it seems like the easiest way to account for all possible bad searches here try: self.search() except: self.search_term_exists = False self.too_many_results = False if self.search_term_exists: if len(self.results) > max_result_count: self.trait_list = [] self.too_many_results = True else: self.gen_search_result() def gen_search_result(self): """ Get the info displayed in the search result table from the set of results computed in the "search" function """ trait_list = [] # result_set represents the results for each search term; a search of # "shh grin2b" would have two sets of results, one for each term if self.dataset.type == "ProbeSet": self.header_data_names = ['index', 'display_name', 'symbol', 'description', 'location', 'mean', 'lrs_score', 'lrs_location', 'additive'] elif self.dataset.type == "Publish": self.header_data_names = ['index', 'display_name', 'description', 'mean', 'authors', 'pubmed_text', 'lrs_score', 'lrs_location', 'additive'] elif self.dataset.type == "Geno": self.header_data_names = ['index', 'display_name', 'location'] for index, result in enumerate(self.results): if not result: continue if self.search_type == "xapian": # These four lines are borrowed from gsearch.py; probably need to put them somewhere else to avoid duplicated code chr_mb = curry(2, lambda chr, mb: f"Chr{chr}: {mb:.6f}") format3f = lambda x: f"{x:.3f}" hmac = curry(3, lambda trait_name, dataset, data_hmac: f"{trait_name}:{dataset}:{data_hmac}") convert_lod = lambda x: x / 4.61 trait = MonadicDict(result) trait["index"] = Just(index) trait["display_name"] = trait["name"] trait["location"] = (Maybe.apply(chr_mb) .to_arguments(trait.pop("chr"), trait.pop("mb"))) trait["lod_score"] = trait.pop("lrs").map(convert_lod).map(format3f) trait["additive"] = trait["additive"].map(format3f) trait["mean"] = trait["mean"].map(format3f) trait["lrs_location"] = (Maybe.apply(chr_mb) .to_arguments(trait.pop("geno_chr"), trait.pop("geno_mb"))) description_text = trait['description'].maybe("N/A", lambda a: a) if len(description_text) > 200: description_text = description_text[:200] + "..." trait['description'] = Just(description_text) if self.dataset.type == "ProbeSet": trait["hmac"] = (Maybe.apply(hmac) .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}")))) elif self.dataset.type == "Publish": inbredsetcode = trait.pop("inbredsetcode") if inbredsetcode.map(len) == Just(3): trait["display_name"] = (Maybe.apply( curry(2, lambda inbredsetcode, name: f"{inbredsetcode}_{name}")) .to_arguments(inbredsetcode, trait["name"])) trait["hmac"] = (Maybe.apply(hmac) .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}")))) trait["authors"] = trait["authors_display"] = (trait.pop("authors").map( lambda authors: ", ".join(authors[:2] + ["et al."] if len(authors) >=2 else authors))) trait["pubmed_text"] = trait["year"].map(str) trait_list.append(trait.data) else: trait_dict = {} trait_dict['index'] = index + 1 trait_dict['dataset'] = self.dataset.name if self.dataset.type == "ProbeSet": trait_dict['display_name'] = result[2] trait_dict['hmac'] = hmac_creation('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip() description_text = "" if result[4] is not None and str(result[4]) != "": description_text = unicodedata.normalize("NFKD", result[4].decode('latin1')) target_string = result[5].decode('utf-8') if result[5] else "" description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip() trait_dict['description'] = description_display trait_dict['location'] = "N/A" if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0): trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}" trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}" trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}" trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}" trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}" elif self.dataset.type == "Geno": trait_dict['display_name'] = str(result[0]) trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) trait_dict['location'] = "N/A" if (result[4] != "NULL" and result[4] != "") and (result[5] != 0): trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}" elif self.dataset.type == "Publish": # Check permissions on a trait-by-trait basis for phenotype traits trait_dict['name'] = trait_dict['display_name'] = str(result[0]) trait_dict['hmac'] = hmac_creation('{}:{}'.format(trait_dict['name'], trait_dict['dataset'])) permissions = check_resource_availability( self.dataset, g.user_session.user_id, trait_dict['display_name']) if not any(x in permissions['data'] for x in ["view", "edit"]): continue if result[10]: trait_dict['display_name'] = str(result[10]) + "_" + str(result[0]) trait_dict['description'] = "N/A" trait_dict['pubmed_id'] = "N/A" trait_dict['pubmed_link'] = "N/A" trait_dict['pubmed_text'] = "N/A" trait_dict['mean'] = "N/A" trait_dict['additive'] = "N/A" pre_pub_description = "N/A" if result[1] is None else result[1].strip() post_pub_description = "N/A" if result[2] is None else result[2].strip() if result[5] != "NULL" and result[5] != None: trait_dict['pubmed_id'] = result[5] trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id'] trait_dict['description'] = post_pub_description else: trait_dict['description'] = pre_pub_description if result[4].isdigit(): trait_dict['pubmed_text'] = result[4] trait_dict['authors'] = result[3] trait_dict['authors_display'] = trait_dict['authors'] author_list = trait_dict['authors'].split(",") if len(author_list) >= 2: trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al." if result[6] != "" and result[6] != None: trait_dict['mean'] = f"{result[6]:.3f}" try: trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}" except: trait_dict['lod_score'] = "N/A" try: trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}" except: trait_dict['lrs_location'] = "N/A" trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}" trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type) # Convert any bytes in dict to a normal utf-8 string for key in trait_dict.keys(): if isinstance(trait_dict[key], bytes): try: trait_dict[key] = trait_dict[key].decode('utf-8') except UnicodeDecodeError: trait_dict[key] = trait_dict[key].decode('latin-1') trait_list.append(trait_dict) if self.results: self.max_widths = {} for i, trait in enumerate(trait_list): for key in trait.keys(): if key == "authors": authors_string = ",".join(str(trait[key]).split(",")[:2]) + ", et al." self.max_widths[key] = max(len(authors_string), self.max_widths[key]) if key in self.max_widths else len(str(authors_string)) elif key == "symbol": self.max_widths[key] = len(trait[key]) if len(trait[key]) > 20: self.max_widths[key] = 20 else: self.max_widths[key] = max(len(str(trait[key])), self.max_widths[key]) if key in self.max_widths else len(str(trait[key])) self.wide_columns_exist = False if self.dataset.type == "Publish": if (self.max_widths['display_name'] > 25 or self.max_widths['description'] > 100 or self.max_widths['authors']> 80): self.wide_columns_exist = True if self.dataset.type == "ProbeSet": if (self.max_widths['display_name'] > 25 or self.max_widths['symbol'] > 25 or self.max_widths['description'] > 100): self.wide_columns_exist = True self.trait_list = trait_list def search(self): """ This function sets up the actual search query in the form of a SQL statement and executes """ self.search_terms = parser.parse(self.search_terms) # Set of terms compatible with Xapian currently (None is a search without a term) xapian_terms = ["POSITION", "MEAN", "LRS", "LOD"] if all([(the_term['key'] in xapian_terms) or (not the_term['key'] and self.dataset.type != "Publish") for the_term in self.search_terms]): self.search_type = "xapian" self.results = requests.get(generate_xapian_request(self.dataset, self.search_terms, self.and_or)).json() if not len(self.results) or 'error' in self.results: self.results = [] self.sql_search() else: self.sql_search() def get_search_ob(self, a_search): search_term = a_search['search_term'] search_operator = a_search['separator'] search_type = {} search_type['dataset_type'] = self.dataset.type if a_search['key']: search_type['key'] = a_search['key'].upper() else: search_type['key'] = None search_ob = do_search.DoSearch.get_search(search_type) if search_ob: search_class = getattr(do_search, search_ob) the_search = search_class(search_term, search_operator, self.dataset, search_type['key'] ) return the_search else: return None def sql_search(self): self.search_type = "sql" combined_from_clause = "" combined_where_clause = "" # The same table can't be referenced twice in the from clause previous_from_clauses = [] for i, a_search in enumerate(self.search_terms): if a_search['key'] == "GO": self.go_term = a_search['search_term'][0] gene_list = get_GO_symbols(a_search) self.search_terms += gene_list continue else: the_search = self.get_search_ob(a_search) if the_search != None: if a_search['key'] == None and self.dataset.type == "ProbeSet": alias_terms = get_alias_terms(a_search['search_term'][0], self.dataset.group.species) alias_where_clauses = [] for alias_search in alias_terms: alias_search_ob = self.get_search_ob(alias_search) if alias_search_ob != None: get_from_clause = getattr( alias_search_ob, "get_from_clause", None) if callable(get_from_clause): from_clause = alias_search_ob.get_from_clause() if from_clause in previous_from_clauses: pass else: previous_from_clauses.append(from_clause) combined_from_clause += from_clause where_clause = alias_search_ob.get_alias_where_clause() alias_where_clauses.append(where_clause) get_from_clause = getattr( the_search, "get_from_clause", None) if callable(get_from_clause): from_clause = the_search.get_from_clause() if from_clause in previous_from_clauses: pass else: previous_from_clauses.append(from_clause) combined_from_clause += from_clause where_clause = the_search.get_where_clause() alias_where_clauses.append(where_clause) combined_where_clause += "(" + " OR ".join(alias_where_clauses) + ")" if (i + 1) < len(self.search_terms): if self.and_or == "and": combined_where_clause += "AND" else: combined_where_clause += "OR" else: get_from_clause = getattr( the_search, "get_from_clause", None) if callable(get_from_clause): from_clause = the_search.get_from_clause() if from_clause in previous_from_clauses: pass else: previous_from_clauses.append(from_clause) combined_from_clause += from_clause where_clause = the_search.get_where_clause() combined_where_clause += "(" + where_clause + ")" if (i + 1) < len(self.search_terms): if self.and_or == "and": combined_where_clause += "AND" else: combined_where_clause += "OR" else: self.search_term_exists = False if self.search_term_exists: combined_where_clause = "(" + combined_where_clause + ")" final_query = the_search.compile_final_query( combined_from_clause, combined_where_clause) results = the_search.execute(final_query) self.results.extend(results) if self.search_term_exists: if the_search != None: self.header_fields = the_search.header_fields def trait_info_str(trait, dataset_type): """Provide a string representation for given trait""" def __trait_desc(trt): if dataset_type == "Geno": return f"Marker: {trait['display_name']}" return trait['description'] or "N/A" def __symbol(trt): if dataset_type == "ProbeSet": return (trait['symbol'] or "N/A")[:20] def __lrs(trt): if dataset_type == "Geno": return 0 else: if trait['lod_score'] != "N/A": return ( f"{float(trait['lod_score']):0.3f}" if float(trait['lod_score']) > 0 else f"{trait['lod_score']}") else: return "N/A" def __lrs_location(trt): if 'lrs_location' in trait: return trait['lrs_location'] else: return "N/A" def __location(trt): if 'location' in trait: return trait['location'] else: return None def __mean(trt): if 'mean' in trait: return trait['mean'] else: return 0 return "{}|||{}|||{}|||{}|||{}|||{}|||{}|||{}".format( trait['display_name'], trait['dataset'], __trait_desc(trait), __symbol(trait), __location(trait), __mean(trait), __lrs(trait), __lrs_location(trait)) def get_GO_symbols(a_search): gene_list = None with database_connection(get_setting("SQL_URI")) as conn, conn.cursor() as cursor: cursor.execute("SELECT genes FROM GORef WHERE goterm=%s", (f"{a_search['key']}:{a_search['search_term'][0]}",)) gene_list = cursor.fetchone()[0].strip().split() new_terms = [] for gene in gene_list: new_terms.append(dict(key=None, separator=None, search_term=[gene])) return new_terms def insert_newlines(string, every=64): """ This is because it is seemingly impossible to change the width of the description column, so I'm just manually adding line breaks """ lines = [] for i in range(0, len(string), every): lines.append(string[i:i + every]) return '\n'.join(lines) def get_alias_terms(symbol, species): if species == "mouse": symbol_string = symbol.capitalize() elif species == "human": symbol_string = symbol.upper() else: return [] filtered_aliases = [] response = requests.get( GN2_BASE_URL + "/gn3/gene/aliases/" + symbol_string) if response: alias_list = json.loads(response.content) seen = set() for item in alias_list: if item in seen: continue else: filtered_aliases.append(item) seen.add(item) alias_terms = [] for alias in filtered_aliases: the_search_term = {'key': None, 'search_term': [alias], 'separator': None} alias_terms.append(the_search_term) return alias_terms def generate_xapian_request(dataset, search_terms, and_or): """ Generate the resquest to GN3 which queries Xapian """ match dataset.type: case "ProbeSet": search_type = "gene" case "Publish": search_type = "phenotype" case "Geno": search_type = "genotype" case _: # This should never happen raise ValueError(f"Dataset types should only be ProbeSet, Publish, or Geno, not '{dataset.type}'") xapian_terms = f" {and_or.upper()} ".join([create_xapian_term(dataset, term) for term in search_terms]) return urljoin(GN3_LOCAL_URL, "/api/search?" + urlencode({"query": xapian_terms, "type": search_type, "per_page": MAX_SEARCH_RESULTS})) def create_xapian_term(dataset, term): """ Create Xapian term for each search term """ search_term = term['search_term'] xapian_term = f"dataset:{dataset.name.lower()} AND " match term['key']: case 'MEAN': return xapian_term + f"mean:{search_term[0]}..{search_term[1]}" case 'POSITION': return xapian_term + f"chr:{search_term[0].lower().replace('chr', '')} AND position:{int(search_term[1])*10**6}..{int(search_term[2])*10**6}" case 'AUTHOR': return xapian_term + f"author:{search_term[0]}" case 'LRS': xapian_term += f"peak:{search_term[0]}..{search_term[1]}" if len(search_term) == 5: xapian_term += f" AND peakchr:{search_term[2].lower().replace('chr', '')} AND peakmb:{float(search_term[3])}..{float(search_term[4])}" return xapian_term case 'LOD': # Basically just LRS search but all values are multiplied by 4.61 xapian_term += f"peak:{float(search_term[0]) * 4.61}..{float(search_term[1]) * 4.61}" if len(search_term) == 5: xapian_term += f" AND peakchr:{search_term[2].lower().replace('chr', '')}" xapian_term += f" AND peakmb:{float(search_term[3])}..{float(search_term[4])}" return xapian_term case None: return xapian_term + f"{search_term[0]}"