1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
"""Search using Xapian index."""
from collections import namedtuple
import json
from functools import partial, reduce
from typing import Callable
import urllib.parse
from flask import abort, Blueprint, current_app, jsonify, request
from pymonad.maybe import Just, Maybe, Nothing
import xapian
from gn3.monads import MonadicDict
from gn3.db_utils import xapian_database
search = Blueprint("search", __name__)
ChromosomalPosition = namedtuple("ChromosomalPosition", "chromosome position")
ChromosomalInterval = namedtuple("ChromosomalInterval", "chromosome start end")
FieldProcessorFunction = Callable[[str], xapian.Query]
def interval_start(interval: ChromosomalInterval) -> Maybe[ChromosomalPosition]:
"""Return start of a ChromosomalInterval as a ChromosomalPosition."""
return interval.start.map(lambda start: ChromosomalPosition(interval.chromosome, start))
def interval_end(interval: ChromosomalInterval) -> Maybe[ChromosomalPosition]:
"""Return end of a ChromosomalInterval as a ChromosomalPosition."""
return interval.end.map(lambda end: ChromosomalPosition(interval.chromosome, end))
def combine_queries(operator: int, *queries: xapian.Query) -> xapian.Query:
"""Combine xapian queries using operator."""
return reduce(partial(xapian.Query, operator), queries)
class FieldProcessor(xapian.FieldProcessor):
"""
Field processor for use in a xapian query parser.
This class allows us to create any field processor without creating a
separate class for each. To create a field processor, you only have to
pass FieldProcessor a function. This function may be a closure. All
additional state required by the field processor is contained in the
lexical environment of the closure.
"""
def __init__(self, proc: FieldProcessorFunction) -> None:
super().__init__()
self.proc = proc
def __call__(self, query: str) -> xapian.Query:
return self.proc(query)
def parse_range(range_string: str) -> tuple[Maybe[str], Maybe[str]]:
"""Parse xapian range strings such as start..end."""
start, end = range_string.split("..")
return (Nothing if start == "" else Just(start),
Nothing if end == "" else Just(end))
def apply_si_suffix(location: str) -> int:
"""Apply SI suffixes kilo, mega, giga and convert to bases."""
suffixes = {"k": 3, "m": 6, "g": 9}
return int(float(location[:-1])*10**suffixes.get(location[-1].lower(), 0))
def parse_location_field(species: str, species_prefix: str,
chromosome_prefix: str, location_slot: int,
query: bytes) -> xapian.Query:
"""Parse location shorthands and return a xapian query.
Location shorthands compress species, chromosome and position into a
single field. e.g., Hs:chr2:1M..1.2M
"""
def split_query(query: str) -> ChromosomalInterval:
"""Split query into chromosome and location tuple."""
chromosome, location = query.lower().split(":")
if not chromosome.startswith("chr"):
raise ValueError
return ChromosomalInterval(chromosome.removeprefix("chr"),
*[location.map(apply_si_suffix)
for location in parse_range(location)])
try:
interval = split_query(query.decode("utf-8"))
except ValueError:
return xapian.Query(xapian.Query.OP_INVALID)
return combine_queries(xapian.Query.OP_AND,
xapian.Query(species_prefix + species),
xapian.Query(chromosome_prefix + interval.chromosome),
xapian.NumberRangeProcessor(location_slot)
(interval.start.maybe("", str),
interval.end.maybe("", str)))
def parse_query(query: str):
"""Parse search query using GeneNetwork specific field processors."""
queryparser = xapian.QueryParser()
queryparser.set_stemmer(xapian.Stem("en"))
queryparser.set_stemming_strategy(queryparser.STEM_SOME)
species_prefix = "XS"
chromosome_prefix = "XC"
queryparser.add_boolean_prefix("author", "A")
queryparser.add_boolean_prefix("species", species_prefix)
queryparser.add_boolean_prefix("group", "XG")
queryparser.add_boolean_prefix("tissue", "XI")
queryparser.add_boolean_prefix("dataset", "XDS")
queryparser.add_boolean_prefix("symbol", "XY")
queryparser.add_boolean_prefix("chr", chromosome_prefix)
queryparser.add_boolean_prefix("peakchr", "XPC")
queryparser.add_prefix("description", "XD")
range_prefixes = ["mean", "peak", "mb", "peakmb", "additive", "year"]
for i, prefix in enumerate(range_prefixes):
queryparser.add_rangeprocessor(xapian.NumberRangeProcessor(i, prefix + ":"))
# Add field processors for location shorthands.
species_shorthands = {"Hs": "human",
"Mm": "mouse",
"Rn": "rat"}
for shorthand, species in species_shorthands.items():
queryparser.add_boolean_prefix(
shorthand, FieldProcessor(partial(parse_location_field,
species,
species_prefix,
chromosome_prefix,
range_prefixes.index("mb"))))
return queryparser.parse_query(query)
@search.route("/")
def search_results():
"""Search Xapian index and return a list of results."""
args = request.args
search_type = args.get("type", default="gene")
querystring = args.get("query", default="")
page = args.get("page", default=1, type=int)
if page < 1:
abort(404, description="Requested page does not exist")
results_per_page = args.get("per_page", default=100, type=int)
maximum_results_per_page = 10000
if results_per_page > maximum_results_per_page:
abort(400, description="Requested too many search results")
query = parse_query(querystring)
traits = []
# pylint: disable=invalid-name
with xapian_database(current_app.config["XAPIAN_DB_PATH"]) as db:
enquire = xapian.Enquire(db)
# Filter documents by type.
enquire.set_query(xapian.Query(xapian.Query.OP_FILTER,
query,
xapian.Query(f"XT{search_type}")))
for xapian_match in enquire.get_mset((page-1)*results_per_page, results_per_page):
trait = MonadicDict(json.loads(xapian_match.document.get_data()))
# Add PubMed link to phenotype search results.
if search_type == "phenotype":
trait["pubmed_link"] = trait["pubmed_id"].map(
lambda pubmed_id: "http://www.ncbi.nlm.nih.gov/entrez/query.fcgi?"
+ urllib.parse.urlencode({"cmd": "Retrieve",
"db": "PubMed",
"list_uids": pubmed_id,
"dopt": "Abstract"}))
traits.append(trait.data)
return jsonify(traits)
|