diff options
Diffstat (limited to 'gn2/wqflask/search_results.py')
-rw-r--r-- | gn2/wqflask/search_results.py | 321 |
1 files changed, 215 insertions, 106 deletions
diff --git a/gn2/wqflask/search_results.py b/gn2/wqflask/search_results.py index b0f08463..1d89e52a 100644 --- a/gn2/wqflask/search_results.py +++ b/gn2/wqflask/search_results.py @@ -2,12 +2,18 @@ import uuid from math import * import requests import unicodedata +from urllib.parse import urlencode, urljoin import re import json +from pymonad.maybe import Just, Maybe +from pymonad.tools import curry + from flask import g +from gn3.monads import MonadicDict + from gn2.base.data_set import create_dataset from gn2.base.webqtlConfig import PUBMEDLINK_URL from gn2.wqflask import parser @@ -15,11 +21,12 @@ from gn2.wqflask import do_search from gn2.wqflask.database import database_connection -from gn2.utility import hmac from gn2.utility.authentication_tools import check_resource_availability -from gn2.utility.tools import get_setting, GN2_BASE_URL +from gn2.utility.hmac import hmac_creation +from gn2.utility.tools import get_setting, GN2_BASE_URL, GN3_LOCAL_URL from gn2.utility.type_checking import is_str +MAX_SEARCH_RESULTS = 50000 # Max number of search results, passed to Xapian search (this needs to match the value in GN3!) class SearchResultPage: #maxReturn = 3000 @@ -36,6 +43,7 @@ class SearchResultPage: self.uc_id = uuid.uuid4() self.go_term = None + self.search_type = "sql" # Assume it's an SQL search by default, since all searches will work with SQL if kw['search_terms_or']: self.and_or = "or" @@ -89,7 +97,6 @@ class SearchResultPage: """ trait_list = [] - json_trait_list = [] # result_set represents the results for each search term; a search of # "shh grin2b" would have two sets of results, one for each term @@ -105,97 +112,137 @@ class SearchResultPage: if not result: continue - trait_dict = {} - trait_dict['index'] = index + 1 - - trait_dict['dataset'] = self.dataset.name - if self.dataset.type == "ProbeSet": - trait_dict['display_name'] = result[2] - trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) - trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip() - description_text = "" - if result[4] is not None and str(result[4]) != "": - description_text = unicodedata.normalize("NFKD", result[4].decode('latin1')) - - target_string = result[5].decode('utf-8') if result[5] else "" - description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip() - trait_dict['description'] = description_display - - trait_dict['location'] = "N/A" - if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0): - trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}" - - trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}" - trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}" - trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}" - trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}" - elif self.dataset.type == "Geno": - trait_dict['display_name'] = str(result[0]) - trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) - trait_dict['location'] = "N/A" - if (result[4] != "NULL" and result[4] != "") and (result[5] != 0): - trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}" - elif self.dataset.type == "Publish": - # Check permissions on a trait-by-trait basis for phenotype traits - trait_dict['name'] = trait_dict['display_name'] = str(result[0]) - trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['name'], trait_dict['dataset'])) - permissions = check_resource_availability( - self.dataset, g.user_session.user_id, trait_dict['display_name']) - if not any(x in permissions['data'] for x in ["view", "edit"]): - continue - - if result[10]: - trait_dict['display_name'] = str(result[10]) + "_" + str(result[0]) - trait_dict['description'] = "N/A" - trait_dict['pubmed_id'] = "N/A" - trait_dict['pubmed_link'] = "N/A" - trait_dict['pubmed_text'] = "N/A" - trait_dict['mean'] = "N/A" - trait_dict['additive'] = "N/A" - pre_pub_description = "N/A" if result[1] is None else result[1].strip() - post_pub_description = "N/A" if result[2] is None else result[2].strip() - if result[5] != "NULL" and result[5] != None: - trait_dict['pubmed_id'] = result[5] - trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id'] - trait_dict['description'] = post_pub_description - else: - trait_dict['description'] = pre_pub_description + if self.search_type == "xapian": + # These four lines are borrowed from gsearch.py; probably need to put them somewhere else to avoid duplicated code + chr_mb = curry(2, lambda chr, mb: f"Chr{chr}: {mb:.6f}") + format3f = lambda x: f"{x:.3f}" + hmac = curry(3, lambda trait_name, dataset, data_hmac: f"{trait_name}:{dataset}:{data_hmac}") + convert_lod = lambda x: x / 4.61 + + trait = MonadicDict(result) + trait["index"] = Just(index) + trait["display_name"] = trait["name"] + trait["location"] = (Maybe.apply(chr_mb) + .to_arguments(trait.pop("chr"), trait.pop("mb"))) + trait["lod_score"] = trait.pop("lrs").map(convert_lod).map(format3f) + trait["additive"] = trait["additive"].map(format3f) + trait["mean"] = trait["mean"].map(format3f) + trait["lrs_location"] = (Maybe.apply(chr_mb) + .to_arguments(trait.pop("geno_chr"), trait.pop("geno_mb"))) + + description_text = trait['description'].maybe("N/A", lambda a: a) + if len(description_text) > 200: + description_text = description_text[:200] + "..." + trait['description'] = Just(description_text) + + if self.dataset.type == "ProbeSet": + trait["hmac"] = (Maybe.apply(hmac) + .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}")))) + elif self.dataset.type == "Publish": + inbredsetcode = trait.pop("inbredsetcode") + if inbredsetcode.map(len) == Just(3): + trait["display_name"] = (Maybe.apply( + curry(2, lambda inbredsetcode, name: f"{inbredsetcode}_{name}")) + .to_arguments(inbredsetcode, trait["name"])) + + trait["hmac"] = (Maybe.apply(hmac) + .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}")))) + trait["authors"] = trait["authors_display"] = (trait.pop("authors").map( + lambda authors: + ", ".join(authors[:2] + ["et al."] if len(authors) >=2 else authors))) + trait["pubmed_text"] = trait["year"].map(str) + trait_list.append(trait.data) + else: + trait_dict = {} + trait_dict['index'] = index + 1 + trait_dict['dataset'] = self.dataset.name + if self.dataset.type == "ProbeSet": + trait_dict['display_name'] = result[2] + trait_dict['hmac'] = f"{trait_dict['display_name']}:{trait_dict['dataset']}:{hmac_creation('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))}" + trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip() + description_text = "" + if result[4] is not None and str(result[4]) != "": + description_text = unicodedata.normalize("NFKD", result[4].decode('latin1')) + + target_string = result[5].decode('utf-8') if result[5] else "" + description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip() + trait_dict['description'] = description_display + + trait_dict['location'] = "N/A" + if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0): + trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}" + + trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}" + trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}" + trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}" + trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}" + elif self.dataset.type == "Geno": + trait_dict['display_name'] = str(result[0]) + trait_dict['hmac'] = f"{trait_dict['display_name']}:{trait_dict['dataset']}:{hmac_creation('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))}" + trait_dict['location'] = "N/A" + if (result[4] != "NULL" and result[4] != "") and (result[5] != 0): + trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}" + elif self.dataset.type == "Publish": + # Check permissions on a trait-by-trait basis for phenotype traits + trait_dict['name'] = trait_dict['display_name'] = str(result[0]) + trait_dict['hmac'] = f"{trait_dict['display_name']}:{trait_dict['dataset']}:{hmac_creation('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))}" + permissions = check_resource_availability( + self.dataset, g.user_session.user_id, trait_dict['display_name']) + if not any(x in permissions['data'] for x in ["view", "edit"]): + continue + + if result[10]: + trait_dict['display_name'] = str(result[10]) + "_" + str(result[0]) + trait_dict['description'] = "N/A" + trait_dict['pubmed_id'] = "N/A" + trait_dict['pubmed_link'] = "N/A" + trait_dict['pubmed_text'] = "N/A" + trait_dict['mean'] = "N/A" + trait_dict['additive'] = "N/A" + pre_pub_description = "N/A" if result[1] is None else result[1].strip() + post_pub_description = "N/A" if result[2] is None else result[2].strip() + if result[5] != "NULL" and result[5] != None: + trait_dict['pubmed_id'] = result[5] + trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id'] + trait_dict['description'] = post_pub_description + else: + trait_dict['description'] = pre_pub_description - if result[4].isdigit(): - trait_dict['pubmed_text'] = result[4] + if result[4].isdigit(): + trait_dict['pubmed_text'] = result[4] - trait_dict['authors'] = result[3] - trait_dict['authors_display'] = trait_dict['authors'] - author_list = trait_dict['authors'].split(",") - if len(author_list) >= 2: - trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al." + trait_dict['authors'] = result[3] + trait_dict['authors_display'] = trait_dict['authors'] + author_list = trait_dict['authors'].split(",") + if len(author_list) >= 2: + trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al." - if result[6] != "" and result[6] != None: - trait_dict['mean'] = f"{result[6]:.3f}" + if result[6] != "" and result[6] != None: + trait_dict['mean'] = f"{result[6]:.3f}" - try: - trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}" - except: - trait_dict['lod_score'] = "N/A" + try: + trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}" + except: + trait_dict['lod_score'] = "N/A" - try: - trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}" - except: - trait_dict['lrs_location'] = "N/A" + try: + trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}" + except: + trait_dict['lrs_location'] = "N/A" - trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}" + trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}" - trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type) + trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type) - # Convert any bytes in dict to a normal utf-8 string - for key in trait_dict.keys(): - if isinstance(trait_dict[key], bytes): - try: - trait_dict[key] = trait_dict[key].decode('utf-8') - except UnicodeDecodeError: - trait_dict[key] = trait_dict[key].decode('latin-1') + # Convert any bytes in dict to a normal utf-8 string + for key in trait_dict.keys(): + if isinstance(trait_dict[key], bytes): + try: + trait_dict[key] = trait_dict[key].decode('utf-8') + except UnicodeDecodeError: + trait_dict[key] = trait_dict[key].decode('latin-1') - trait_list.append(trait_dict) + trait_list.append(trait_dict) if self.results: self.max_widths = {} @@ -229,6 +276,42 @@ class SearchResultPage: """ self.search_terms = parser.parse(self.search_terms) + # Set of terms compatible with Xapian currently (None is a search without a term) + xapian_terms = ["POSITION", "MEAN", "LRS", "LOD", "RIF", "WIKI"] + + if all([(the_term['key'] in xapian_terms) or (not the_term['key'] and self.dataset.type != "Publish") for the_term in self.search_terms]): + self.search_type = "xapian" + self.results = requests.get(generate_xapian_request(self.dataset, self.search_terms, self.and_or)).json() + if not len(self.results) or 'error' in self.results: + self.results = [] + self.sql_search() + else: + self.sql_search() + + def get_search_ob(self, a_search): + search_term = a_search['search_term'] + search_operator = a_search['separator'] + search_type = {} + search_type['dataset_type'] = self.dataset.type + if a_search['key']: + search_type['key'] = a_search['key'].upper() + else: + search_type['key'] = None + + search_ob = do_search.DoSearch.get_search(search_type) + if search_ob: + search_class = getattr(do_search, search_ob) + the_search = search_class(search_term, + search_operator, + self.dataset, + search_type['key'] + ) + return the_search + else: + return None + + def sql_search(self): + self.search_type = "sql" combined_from_clause = "" combined_where_clause = "" # The same table can't be referenced twice in the from clause @@ -313,27 +396,6 @@ class SearchResultPage: if the_search != None: self.header_fields = the_search.header_fields - def get_search_ob(self, a_search): - search_term = a_search['search_term'] - search_operator = a_search['separator'] - search_type = {} - search_type['dataset_type'] = self.dataset.type - if a_search['key']: - search_type['key'] = a_search['key'].upper() - else: - search_type['key'] = None - - search_ob = do_search.DoSearch.get_search(search_type) - if search_ob: - search_class = getattr(do_search, search_ob) - the_search = search_class(search_term, - search_operator, - self.dataset, - search_type['key'] - ) - return the_search - else: - return None def trait_info_str(trait, dataset_type): """Provide a string representation for given trait""" @@ -431,3 +493,50 @@ def get_alias_terms(symbol, species): alias_terms.append(the_search_term) return alias_terms + +def generate_xapian_request(dataset, search_terms, and_or): + """ Generate the resquest to GN3 which queries Xapian """ + match dataset.type: + case "ProbeSet": + search_type = "gene" + case "Publish": + search_type = "phenotype" + case "Geno": + search_type = "genotype" + case _: # This should never happen + raise ValueError(f"Dataset types should only be ProbeSet, Publish, or Geno, not '{dataset.type}'") + + xapian_terms = f" {and_or.upper()} ".join([create_xapian_term(dataset, term) for term in search_terms]) + + return urljoin(GN3_LOCAL_URL, "/api/search?" + urlencode({"query": xapian_terms, + "type": search_type, + "per_page": MAX_SEARCH_RESULTS})) + +def create_xapian_term(dataset, term): + """ Create Xapian term for each search term """ + search_term = term['search_term'] + xapian_term = f"dataset:{dataset.name.lower()} AND " + match term['key']: + case 'MEAN': + return xapian_term + f"mean:{search_term[0]}..{search_term[1]}" + case 'POSITION': + return xapian_term + f"chr:{search_term[0].lower().replace('chr', '')} AND position:{int(search_term[1])*10**6}..{int(search_term[2])*10**6}" + case 'AUTHOR': + return xapian_term + f"author:{search_term[0]}" + case 'RIF': + return xapian_term + f"rif:{search_term[0]}" + case 'WIKI': + return xapian_term + f"wiki:{search_term[0]}" + case 'LRS': + xapian_term += f"peak:{search_term[0]}..{search_term[1]}" + if len(search_term) == 5: + xapian_term += f" AND peakchr:{search_term[2].lower().replace('chr', '')} AND peakmb:{float(search_term[3])}..{float(search_term[4])}" + return xapian_term + case 'LOD': # Basically just LRS search but all values are multiplied by 4.61 + xapian_term += f"peak:{float(search_term[0]) * 4.61}..{float(search_term[1]) * 4.61}" + if len(search_term) == 5: + xapian_term += f" AND peakchr:{search_term[2].lower().replace('chr', '')}" + xapian_term += f" AND peakmb:{float(search_term[3])}..{float(search_term[4])}" + return xapian_term + case None: + return xapian_term + f"{search_term[0]}" |