From 975067f13c6491cb464d069413ff72f1cd41958c Mon Sep 17 00:00:00 2001 From: zsloan Date: Thu, 20 Jun 2024 20:00:10 +0000 Subject: Create trait objects from Xapian search results to pass to DataTables This means the result table should be displayed now. The only issue is that this involves a significant amount of duplicated code from gsearch.py; I'm not sure how best to avoid this, but this is okay for now. --- gn2/wqflask/search_results.py | 242 ++++++++++++++++++++++++++---------------- 1 file changed, 151 insertions(+), 91 deletions(-) diff --git a/gn2/wqflask/search_results.py b/gn2/wqflask/search_results.py index 67cebf56..9558ffa8 100644 --- a/gn2/wqflask/search_results.py +++ b/gn2/wqflask/search_results.py @@ -7,8 +7,13 @@ import re import json +from pymonad.maybe import Just, Maybe +from pymonad.tools import curry + from flask import g +from gn3.monads import MonadicDict + from gn2.base.data_set import create_dataset from gn2.base.webqtlConfig import PUBMEDLINK_URL from gn2.wqflask import parser @@ -16,11 +21,13 @@ from gn2.wqflask import do_search from gn2.wqflask.database import database_connection -from gn2.utility import hmac from gn2.utility.authentication_tools import check_resource_availability +from gn2.utility.hmac import hmac_creation from gn2.utility.tools import get_setting, GN2_BASE_URL, GN3_LOCAL_URL from gn2.utility.type_checking import is_str +MAX_SEARCH_RESULTS = 20000 # Max number of search results, passed to Xapian search + class SearchResultPage: #maxReturn = 3000 @@ -83,14 +90,13 @@ class SearchResultPage: else: self.gen_search_result() - def gen_search_result(self): + def gen_search_result(self, search_type="sql"): """ Get the info displayed in the search result table from the set of results computed in the "search" function """ trait_list = [] - json_trait_list = [] # result_set represents the results for each search term; a search of # "shh grin2b" would have two sets of results, one for each term @@ -106,97 +112,150 @@ class SearchResultPage: if not result: continue - trait_dict = {} - trait_dict['index'] = index + 1 - - trait_dict['dataset'] = self.dataset.name - if self.dataset.type == "ProbeSet": - trait_dict['display_name'] = result[2] - trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) - trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip() - description_text = "" - if result[4] is not None and str(result[4]) != "": - description_text = unicodedata.normalize("NFKD", result[4].decode('latin1')) - - target_string = result[5].decode('utf-8') if result[5] else "" - description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip() - trait_dict['description'] = description_display - - trait_dict['location'] = "N/A" - if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0): - trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}" - - trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}" - trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}" - trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}" - trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}" - elif self.dataset.type == "Geno": - trait_dict['display_name'] = str(result[0]) - trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) - trait_dict['location'] = "N/A" - if (result[4] != "NULL" and result[4] != "") and (result[5] != 0): - trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}" - elif self.dataset.type == "Publish": - # Check permissions on a trait-by-trait basis for phenotype traits - trait_dict['name'] = trait_dict['display_name'] = str(result[0]) - trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['name'], trait_dict['dataset'])) - permissions = check_resource_availability( - self.dataset, g.user_session.user_id, trait_dict['display_name']) - if not any(x in permissions['data'] for x in ["view", "edit"]): - continue - - if result[10]: - trait_dict['display_name'] = str(result[10]) + "_" + str(result[0]) - trait_dict['description'] = "N/A" - trait_dict['pubmed_id'] = "N/A" - trait_dict['pubmed_link'] = "N/A" - trait_dict['pubmed_text'] = "N/A" - trait_dict['mean'] = "N/A" - trait_dict['additive'] = "N/A" - pre_pub_description = "N/A" if result[1] is None else result[1].strip() - post_pub_description = "N/A" if result[2] is None else result[2].strip() - if result[5] != "NULL" and result[5] != None: - trait_dict['pubmed_id'] = result[5] - trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id'] - trait_dict['description'] = post_pub_description - else: - trait_dict['description'] = pre_pub_description + if self.search_type == "xapian": + # These four lines are borrowed from gsearch.py; probably need to put them somewhere else to avoid duplicated code + chr_mb = curry(2, lambda chr, mb: f"Chr{chr}: {mb:.6f}") + format3f = lambda x: f"{x:.3f}" + hmac = curry(3, lambda trait_name, dataset, data_hmac: f"{trait_name}:{dataset}:{data_hmac}") + convert_lod = lambda x: x / 4.61 + + trait = MonadicDict(result) + trait["index"] = Just(index) + trait["display_name"] = trait["name"] + trait["location"] = (Maybe.apply(chr_mb) + .to_arguments(trait.pop("chr"), trait.pop("mb"))) + trait["lod_score"] = trait.pop("lrs").map(convert_lod).map(format3f) + trait["additive"] = trait["additive"].map(format3f) + trait["mean"] = trait["mean"].map(format3f) + trait["lrs_location"] = (Maybe.apply(chr_mb) + .to_arguments(trait.pop("geno_chr"), trait.pop("geno_mb"))) + if self.dataset.type == "ProbeSet": + trait["hmac"] = (Maybe.apply(hmac) + .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}")))) + elif self.dataset.type == "Publish": + inbredsetcode = trait.pop("inbredsetcode") + if inbredsetcode.map(len) == Just(3): + trait["display_name"] = (Maybe.apply( + curry(2, lambda inbredsetcode, name: f"{inbredsetcode}_{name}")) + .to_arguments(inbredsetcode, trait["name"])) + + trait["hmac"] = (Maybe.apply(hmac) + .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}")))) + trait["authors_display"] = (trait.pop("authors").map( + lambda authors: + ", ".join(authors[:2] + ["et al."] if len(authors) >=2 else authors))) + trait["pubmed_text"] = trait["year"].map(str) + trait_list.append(trait.data) +# if self.dataset.type == "ProbeSet": +# trait_dict['display_name'] = result['name'] +# trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) +# trait_dict['symbol'] = "N/A" if not result['symbol'] else result['symbol'].strip() +# description_text = "" +# if result['description']: +# description_text = unicodedata.normalize("NFKD", result['description'].decode('latin1')) +# +# target_string = None # Will change this one the probe_target_description values are indexed +# +# description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip() +# trait_dict['description'] = description_display +# +# trait_dict['location'] = "N/A" +# if result['chr'] and result['chr'] != "" and result['chr'] != "Un" and result['mb'] and result['mb'] != 0: +# trait_dict['location'] = f"Chr{result['chr']}: {float(result['mb']):.6f}" +# +# trait_dict['mean'] = "N/A" if not result['mean'] else f"{result['mean']:.3f}" +# trait_dict['additive'] = "N/A" if not result['additive'] + else: + trait_dict = {} + trait_dict['index'] = index + 1 + trait_dict['dataset'] = self.dataset.name + if self.dataset.type == "ProbeSet": + trait_dict['display_name'] = result[2] + trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) + trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip() + description_text = "" + if result[4] is not None and str(result[4]) != "": + description_text = unicodedata.normalize("NFKD", result[4].decode('latin1')) + + target_string = result[5].decode('utf-8') if result[5] else "" + description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip() + trait_dict['description'] = description_display + + trait_dict['location'] = "N/A" + if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0): + trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}" + + trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}" + trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}" + trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}" + trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}" + elif self.dataset.type == "Geno": + trait_dict['display_name'] = str(result[0]) + trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset'])) + trait_dict['location'] = "N/A" + if (result[4] != "NULL" and result[4] != "") and (result[5] != 0): + trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}" + elif self.dataset.type == "Publish": + # Check permissions on a trait-by-trait basis for phenotype traits + trait_dict['name'] = trait_dict['display_name'] = str(result[0]) + trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['name'], trait_dict['dataset'])) + permissions = check_resource_availability( + self.dataset, g.user_session.user_id, trait_dict['display_name']) + if not any(x in permissions['data'] for x in ["view", "edit"]): + continue + + if result[10]: + trait_dict['display_name'] = str(result[10]) + "_" + str(result[0]) + trait_dict['description'] = "N/A" + trait_dict['pubmed_id'] = "N/A" + trait_dict['pubmed_link'] = "N/A" + trait_dict['pubmed_text'] = "N/A" + trait_dict['mean'] = "N/A" + trait_dict['additive'] = "N/A" + pre_pub_description = "N/A" if result[1] is None else result[1].strip() + post_pub_description = "N/A" if result[2] is None else result[2].strip() + if result[5] != "NULL" and result[5] != None: + trait_dict['pubmed_id'] = result[5] + trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id'] + trait_dict['description'] = post_pub_description + else: + trait_dict['description'] = pre_pub_description - if result[4].isdigit(): - trait_dict['pubmed_text'] = result[4] + if result[4].isdigit(): + trait_dict['pubmed_text'] = result[4] - trait_dict['authors'] = result[3] - trait_dict['authors_display'] = trait_dict['authors'] - author_list = trait_dict['authors'].split(",") - if len(author_list) >= 2: - trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al." + trait_dict['authors'] = result[3] + trait_dict['authors_display'] = trait_dict['authors'] + author_list = trait_dict['authors'].split(",") + if len(author_list) >= 2: + trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al." - if result[6] != "" and result[6] != None: - trait_dict['mean'] = f"{result[6]:.3f}" + if result[6] != "" and result[6] != None: + trait_dict['mean'] = f"{result[6]:.3f}" - try: - trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}" - except: - trait_dict['lod_score'] = "N/A" + try: + trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}" + except: + trait_dict['lod_score'] = "N/A" - try: - trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}" - except: - trait_dict['lrs_location'] = "N/A" + try: + trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}" + except: + trait_dict['lrs_location'] = "N/A" - trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}" + trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}" - trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type) + trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type) - # Convert any bytes in dict to a normal utf-8 string - for key in trait_dict.keys(): - if isinstance(trait_dict[key], bytes): - try: - trait_dict[key] = trait_dict[key].decode('utf-8') - except UnicodeDecodeError: - trait_dict[key] = trait_dict[key].decode('latin-1') + # Convert any bytes in dict to a normal utf-8 string + for key in trait_dict.keys(): + if isinstance(trait_dict[key], bytes): + try: + trait_dict[key] = trait_dict[key].decode('utf-8') + except UnicodeDecodeError: + trait_dict[key] = trait_dict[key].decode('latin-1') - trait_list.append(trait_dict) + trait_list.append(trait_dict) if self.results: self.max_widths = {} @@ -448,26 +507,27 @@ def generate_xapian_request(dataset, search_terms, and_or): case _: # This should never happen raise ValueError(f"Dataset types should only be ProbeSet, Publish, or Geno, not '{dataset.type}'") - xapian_terms = and_or.join([create_xapian_term(dataset, term) for term in search_terms]) + xapian_terms = f" {and_or.upper()} ".join([create_xapian_term(dataset, term) for term in search_terms]) return urljoin(GN3_LOCAL_URL, "/api/search?" + urlencode({"query": xapian_terms, - "type": search_type})) + "type": search_type, + "per_page": MAX_SEARCH_RESULTS})) def create_xapian_term(dataset, term): """ Create Xapian term for each search term """ search_term = term['search_term'] - xapian_term = f"dataset:{dataset.fullname} AND " + xapian_term = f"dataset:{dataset.name.lower()} AND " match term['key']: case 'MEAN': return xapian_term + f"mean:{search_term[0]}..{search_term[1]}" case 'POSITION': - return xapian_term + f"chr:{search_term[0].lower().replace('chr', '')} AND position:{search_term[1]}..{search_term[2]}" + return xapian_term + f"chr:{search_term[0].lower().replace('chr', '')} AND position:{int(search_term[1])*10**6}..{int(search_term[2])*10**6}" case 'AUTHOR': return xapian_term + f"author:{search_term[0]}" case 'LRS': xapian_term += f"peak:{search_term[0]}..{search_term[1]}" if len(term) == 5: - xapian_term += f"peakchr:{search_term[2].lower().replace('chr', '')} AND peakmb:{search_term[3]}..{search_term[4]}" + xapian_term += f"peakchr:{search_term[2].lower().replace('chr', '')} AND peakmb:{float(search_term[3])*10**6}..{float(search_term[4])*10**6}" return xapian_term case 'LOD': # Basically just LRS search but all values are multiplied by 4.61 xapian_term += f"peak:{float(search_term[0]) * 4.61}..{float(search_term[1]) * 4.61}" @@ -476,4 +536,4 @@ def create_xapian_term(dataset, term): xapian_term += f"peakmb:{float(search_term[3]) * 4.61}..{float(search_term[4]) * 4.61}" return xapian_term case None: - return xapian_term + f"{search_term[0]}" \ No newline at end of file + return xapian_term + f"{search_term[0]}" -- cgit v1.2.3