aboutsummaryrefslogtreecommitdiff
path: root/gn2/wqflask
diff options
context:
space:
mode:
Diffstat (limited to 'gn2/wqflask')
-rw-r--r--gn2/wqflask/search_results.py242
1 files changed, 151 insertions, 91 deletions
diff --git a/gn2/wqflask/search_results.py b/gn2/wqflask/search_results.py
index 67cebf56..9558ffa8 100644
--- a/gn2/wqflask/search_results.py
+++ b/gn2/wqflask/search_results.py
@@ -7,8 +7,13 @@ import re
import json
+from pymonad.maybe import Just, Maybe
+from pymonad.tools import curry
+
from flask import g
+from gn3.monads import MonadicDict
+
from gn2.base.data_set import create_dataset
from gn2.base.webqtlConfig import PUBMEDLINK_URL
from gn2.wqflask import parser
@@ -16,11 +21,13 @@ from gn2.wqflask import do_search
from gn2.wqflask.database import database_connection
-from gn2.utility import hmac
from gn2.utility.authentication_tools import check_resource_availability
+from gn2.utility.hmac import hmac_creation
from gn2.utility.tools import get_setting, GN2_BASE_URL, GN3_LOCAL_URL
from gn2.utility.type_checking import is_str
+MAX_SEARCH_RESULTS = 20000 # Max number of search results, passed to Xapian search
+
class SearchResultPage:
#maxReturn = 3000
@@ -83,14 +90,13 @@ class SearchResultPage:
else:
self.gen_search_result()
- def gen_search_result(self):
+ def gen_search_result(self, search_type="sql"):
"""
Get the info displayed in the search result table from the set of results computed in
the "search" function
"""
trait_list = []
- json_trait_list = []
# result_set represents the results for each search term; a search of
# "shh grin2b" would have two sets of results, one for each term
@@ -106,97 +112,150 @@ class SearchResultPage:
if not result:
continue
- trait_dict = {}
- trait_dict['index'] = index + 1
-
- trait_dict['dataset'] = self.dataset.name
- if self.dataset.type == "ProbeSet":
- trait_dict['display_name'] = result[2]
- trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))
- trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip()
- description_text = ""
- if result[4] is not None and str(result[4]) != "":
- description_text = unicodedata.normalize("NFKD", result[4].decode('latin1'))
-
- target_string = result[5].decode('utf-8') if result[5] else ""
- description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip()
- trait_dict['description'] = description_display
-
- trait_dict['location'] = "N/A"
- if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0):
- trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}"
-
- trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}"
- trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}"
- trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}"
- trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}"
- elif self.dataset.type == "Geno":
- trait_dict['display_name'] = str(result[0])
- trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))
- trait_dict['location'] = "N/A"
- if (result[4] != "NULL" and result[4] != "") and (result[5] != 0):
- trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}"
- elif self.dataset.type == "Publish":
- # Check permissions on a trait-by-trait basis for phenotype traits
- trait_dict['name'] = trait_dict['display_name'] = str(result[0])
- trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['name'], trait_dict['dataset']))
- permissions = check_resource_availability(
- self.dataset, g.user_session.user_id, trait_dict['display_name'])
- if not any(x in permissions['data'] for x in ["view", "edit"]):
- continue
-
- if result[10]:
- trait_dict['display_name'] = str(result[10]) + "_" + str(result[0])
- trait_dict['description'] = "N/A"
- trait_dict['pubmed_id'] = "N/A"
- trait_dict['pubmed_link'] = "N/A"
- trait_dict['pubmed_text'] = "N/A"
- trait_dict['mean'] = "N/A"
- trait_dict['additive'] = "N/A"
- pre_pub_description = "N/A" if result[1] is None else result[1].strip()
- post_pub_description = "N/A" if result[2] is None else result[2].strip()
- if result[5] != "NULL" and result[5] != None:
- trait_dict['pubmed_id'] = result[5]
- trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id']
- trait_dict['description'] = post_pub_description
- else:
- trait_dict['description'] = pre_pub_description
+ if self.search_type == "xapian":
+ # These four lines are borrowed from gsearch.py; probably need to put them somewhere else to avoid duplicated code
+ chr_mb = curry(2, lambda chr, mb: f"Chr{chr}: {mb:.6f}")
+ format3f = lambda x: f"{x:.3f}"
+ hmac = curry(3, lambda trait_name, dataset, data_hmac: f"{trait_name}:{dataset}:{data_hmac}")
+ convert_lod = lambda x: x / 4.61
+
+ trait = MonadicDict(result)
+ trait["index"] = Just(index)
+ trait["display_name"] = trait["name"]
+ trait["location"] = (Maybe.apply(chr_mb)
+ .to_arguments(trait.pop("chr"), trait.pop("mb")))
+ trait["lod_score"] = trait.pop("lrs").map(convert_lod).map(format3f)
+ trait["additive"] = trait["additive"].map(format3f)
+ trait["mean"] = trait["mean"].map(format3f)
+ trait["lrs_location"] = (Maybe.apply(chr_mb)
+ .to_arguments(trait.pop("geno_chr"), trait.pop("geno_mb")))
+ if self.dataset.type == "ProbeSet":
+ trait["hmac"] = (Maybe.apply(hmac)
+ .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}"))))
+ elif self.dataset.type == "Publish":
+ inbredsetcode = trait.pop("inbredsetcode")
+ if inbredsetcode.map(len) == Just(3):
+ trait["display_name"] = (Maybe.apply(
+ curry(2, lambda inbredsetcode, name: f"{inbredsetcode}_{name}"))
+ .to_arguments(inbredsetcode, trait["name"]))
+
+ trait["hmac"] = (Maybe.apply(hmac)
+ .to_arguments(trait['name'], trait['dataset'], Just(hmac_creation(f"{trait['name']}:{trait['dataset']}"))))
+ trait["authors_display"] = (trait.pop("authors").map(
+ lambda authors:
+ ", ".join(authors[:2] + ["et al."] if len(authors) >=2 else authors)))
+ trait["pubmed_text"] = trait["year"].map(str)
+ trait_list.append(trait.data)
+# if self.dataset.type == "ProbeSet":
+# trait_dict['display_name'] = result['name']
+# trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))
+# trait_dict['symbol'] = "N/A" if not result['symbol'] else result['symbol'].strip()
+# description_text = ""
+# if result['description']:
+# description_text = unicodedata.normalize("NFKD", result['description'].decode('latin1'))
+#
+# target_string = None # Will change this one the probe_target_description values are indexed
+#
+# description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip()
+# trait_dict['description'] = description_display
+#
+# trait_dict['location'] = "N/A"
+# if result['chr'] and result['chr'] != "" and result['chr'] != "Un" and result['mb'] and result['mb'] != 0:
+# trait_dict['location'] = f"Chr{result['chr']}: {float(result['mb']):.6f}"
+#
+# trait_dict['mean'] = "N/A" if not result['mean'] else f"{result['mean']:.3f}"
+# trait_dict['additive'] = "N/A" if not result['additive']
+ else:
+ trait_dict = {}
+ trait_dict['index'] = index + 1
+ trait_dict['dataset'] = self.dataset.name
+ if self.dataset.type == "ProbeSet":
+ trait_dict['display_name'] = result[2]
+ trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))
+ trait_dict['symbol'] = "N/A" if result[3] is None else result[3].strip()
+ description_text = ""
+ if result[4] is not None and str(result[4]) != "":
+ description_text = unicodedata.normalize("NFKD", result[4].decode('latin1'))
+
+ target_string = result[5].decode('utf-8') if result[5] else ""
+ description_display = description_text if target_string is None or str(target_string) == "" else description_text + "; " + str(target_string).strip()
+ trait_dict['description'] = description_display
+
+ trait_dict['location'] = "N/A"
+ if (result[6] is not None) and (result[6] != "") and (result[6] != "Un") and (result[7] is not None) and (result[7] != 0):
+ trait_dict['location'] = f"Chr{result[6]}: {float(result[7]):.6f}"
+
+ trait_dict['mean'] = "N/A" if result[8] is None or result[8] == "" else f"{result[8]:.3f}"
+ trait_dict['additive'] = "N/A" if result[12] is None or result[12] == "" else f"{result[12]:.3f}"
+ trait_dict['lod_score'] = "N/A" if result[9] is None or result[9] == "" else f"{float(result[9]) / 4.61:.1f}"
+ trait_dict['lrs_location'] = "N/A" if result[13] is None or result[13] == "" or result[14] is None else f"Chr{result[13]}: {float(result[14]):.6f}"
+ elif self.dataset.type == "Geno":
+ trait_dict['display_name'] = str(result[0])
+ trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['display_name'], trait_dict['dataset']))
+ trait_dict['location'] = "N/A"
+ if (result[4] != "NULL" and result[4] != "") and (result[5] != 0):
+ trait_dict['location'] = f"Chr{result[4]}: {float(result[5]):.6f}"
+ elif self.dataset.type == "Publish":
+ # Check permissions on a trait-by-trait basis for phenotype traits
+ trait_dict['name'] = trait_dict['display_name'] = str(result[0])
+ trait_dict['hmac'] = hmac.data_hmac('{}:{}'.format(trait_dict['name'], trait_dict['dataset']))
+ permissions = check_resource_availability(
+ self.dataset, g.user_session.user_id, trait_dict['display_name'])
+ if not any(x in permissions['data'] for x in ["view", "edit"]):
+ continue
+
+ if result[10]:
+ trait_dict['display_name'] = str(result[10]) + "_" + str(result[0])
+ trait_dict['description'] = "N/A"
+ trait_dict['pubmed_id'] = "N/A"
+ trait_dict['pubmed_link'] = "N/A"
+ trait_dict['pubmed_text'] = "N/A"
+ trait_dict['mean'] = "N/A"
+ trait_dict['additive'] = "N/A"
+ pre_pub_description = "N/A" if result[1] is None else result[1].strip()
+ post_pub_description = "N/A" if result[2] is None else result[2].strip()
+ if result[5] != "NULL" and result[5] != None:
+ trait_dict['pubmed_id'] = result[5]
+ trait_dict['pubmed_link'] = PUBMEDLINK_URL % trait_dict['pubmed_id']
+ trait_dict['description'] = post_pub_description
+ else:
+ trait_dict['description'] = pre_pub_description
- if result[4].isdigit():
- trait_dict['pubmed_text'] = result[4]
+ if result[4].isdigit():
+ trait_dict['pubmed_text'] = result[4]
- trait_dict['authors'] = result[3]
- trait_dict['authors_display'] = trait_dict['authors']
- author_list = trait_dict['authors'].split(",")
- if len(author_list) >= 2:
- trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al."
+ trait_dict['authors'] = result[3]
+ trait_dict['authors_display'] = trait_dict['authors']
+ author_list = trait_dict['authors'].split(",")
+ if len(author_list) >= 2:
+ trait_dict['authors_display'] = (",").join(author_list[:2]) + ", et al."
- if result[6] != "" and result[6] != None:
- trait_dict['mean'] = f"{result[6]:.3f}"
+ if result[6] != "" and result[6] != None:
+ trait_dict['mean'] = f"{result[6]:.3f}"
- try:
- trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}"
- except:
- trait_dict['lod_score'] = "N/A"
+ try:
+ trait_dict['lod_score'] = f"{float(result[7]) / 4.61:.1f}"
+ except:
+ trait_dict['lod_score'] = "N/A"
- try:
- trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}"
- except:
- trait_dict['lrs_location'] = "N/A"
+ try:
+ trait_dict['lrs_location'] = f"Chr{result[11]}: {float(result[12]):.6f}"
+ except:
+ trait_dict['lrs_location'] = "N/A"
- trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}"
+ trait_dict['additive'] = "N/A" if not result[8] else f"{result[8]:.3f}"
- trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type)
+ trait_dict['trait_info_str'] = trait_info_str(trait_dict, self.dataset.type)
- # Convert any bytes in dict to a normal utf-8 string
- for key in trait_dict.keys():
- if isinstance(trait_dict[key], bytes):
- try:
- trait_dict[key] = trait_dict[key].decode('utf-8')
- except UnicodeDecodeError:
- trait_dict[key] = trait_dict[key].decode('latin-1')
+ # Convert any bytes in dict to a normal utf-8 string
+ for key in trait_dict.keys():
+ if isinstance(trait_dict[key], bytes):
+ try:
+ trait_dict[key] = trait_dict[key].decode('utf-8')
+ except UnicodeDecodeError:
+ trait_dict[key] = trait_dict[key].decode('latin-1')
- trait_list.append(trait_dict)
+ trait_list.append(trait_dict)
if self.results:
self.max_widths = {}
@@ -448,26 +507,27 @@ def generate_xapian_request(dataset, search_terms, and_or):
case _: # This should never happen
raise ValueError(f"Dataset types should only be ProbeSet, Publish, or Geno, not '{dataset.type}'")
- xapian_terms = and_or.join([create_xapian_term(dataset, term) for term in search_terms])
+ xapian_terms = f" {and_or.upper()} ".join([create_xapian_term(dataset, term) for term in search_terms])
return urljoin(GN3_LOCAL_URL, "/api/search?" + urlencode({"query": xapian_terms,
- "type": search_type}))
+ "type": search_type,
+ "per_page": MAX_SEARCH_RESULTS}))
def create_xapian_term(dataset, term):
""" Create Xapian term for each search term """
search_term = term['search_term']
- xapian_term = f"dataset:{dataset.fullname} AND "
+ xapian_term = f"dataset:{dataset.name.lower()} AND "
match term['key']:
case 'MEAN':
return xapian_term + f"mean:{search_term[0]}..{search_term[1]}"
case 'POSITION':
- return xapian_term + f"chr:{search_term[0].lower().replace('chr', '')} AND position:{search_term[1]}..{search_term[2]}"
+ return xapian_term + f"chr:{search_term[0].lower().replace('chr', '')} AND position:{int(search_term[1])*10**6}..{int(search_term[2])*10**6}"
case 'AUTHOR':
return xapian_term + f"author:{search_term[0]}"
case 'LRS':
xapian_term += f"peak:{search_term[0]}..{search_term[1]}"
if len(term) == 5:
- xapian_term += f"peakchr:{search_term[2].lower().replace('chr', '')} AND peakmb:{search_term[3]}..{search_term[4]}"
+ xapian_term += f"peakchr:{search_term[2].lower().replace('chr', '')} AND peakmb:{float(search_term[3])*10**6}..{float(search_term[4])*10**6}"
return xapian_term
case 'LOD': # Basically just LRS search but all values are multiplied by 4.61
xapian_term += f"peak:{float(search_term[0]) * 4.61}..{float(search_term[1]) * 4.61}"
@@ -476,4 +536,4 @@ def create_xapian_term(dataset, term):
xapian_term += f"peakmb:{float(search_term[3]) * 4.61}..{float(search_term[4]) * 4.61}"
return xapian_term
case None:
- return xapian_term + f"{search_term[0]}" \ No newline at end of file
+ return xapian_term + f"{search_term[0]}"