From f4eb5d21f33d0dc8449d9d326f01d0687472cf4d Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Wed, 18 Jan 2023 01:04:24 +0000 Subject: Implement synteny search. * gn3/api/search.py: Import gzip, Path from pathlib and curry from pymonad.tools. (IntervalLiftoverFunction): New variable. (query_subqueries, query_terms, field_processor_or, liftover, liftover_interval, parse_synteny_field, is_synteny_on, remove_synteny_field): New functions. (parse_location_field): Generalize to support synteny searches. (parse_query): Support synteny search queries. (search_results): Pass synteny files directory to parse_query. --- gn3/api/search.py | 246 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 207 insertions(+), 39 deletions(-) (limited to 'gn3/api') diff --git a/gn3/api/search.py b/gn3/api/search.py index 83951a1..650208b 100644 --- a/gn3/api/search.py +++ b/gn3/api/search.py @@ -1,13 +1,16 @@ """Search using Xapian index.""" from collections import namedtuple +import gzip import json from functools import partial, reduce +from pathlib import Path from typing import Callable import urllib.parse from flask import abort, Blueprint, current_app, jsonify, request from pymonad.maybe import Just, Maybe, Nothing +from pymonad.tools import curry import xapian from gn3.monads import MonadicDict @@ -17,6 +20,7 @@ search = Blueprint("search", __name__) ChromosomalPosition = namedtuple("ChromosomalPosition", "chromosome position") ChromosomalInterval = namedtuple("ChromosomalInterval", "chromosome start end") +IntervalLiftoverFunction = Callable[[ChromosomalInterval], Maybe[ChromosomalInterval]] FieldProcessorFunction = Callable[[str], xapian.Query] @@ -35,6 +39,28 @@ def combine_queries(operator: int, *queries: xapian.Query) -> xapian.Query: return reduce(partial(xapian.Query, operator), queries) +def query_subqueries(query: xapian.Query) -> list[xapian.Query]: + """Return list of child queries in query.""" + return [query.get_subquery(i) for i in range(query.get_num_subqueries())] + + +def query_terms(query: xapian.Query) -> list[str]: + """Return list of terms in query.""" + # Unfortunately, the TermIterator from python xapian bindings seems + # buggy. So, we resort to traversing the query tree. + # python xapian bindings do not expose xapian.Query.LEAF_TERM. This is + # most likely a bug. + leaf_type = 100 + if query.get_type() == leaf_type: + # We have no choice but to access the protected _get_terms_begin method. + # pylint: disable=protected-access + return [query._get_terms_begin().get_term().decode("utf-8")] + else: + return reduce(lambda result, subquery: result + query_terms(subquery), + query_subqueries(query), + []) + + class FieldProcessor(xapian.FieldProcessor): """ Field processor for use in a xapian query parser. @@ -52,6 +78,68 @@ class FieldProcessor(xapian.FieldProcessor): return self.proc(query) +def field_processor_or(*field_processors: FieldProcessorFunction) -> FieldProcessorFunction: + """Combine field processors using the OR operator.""" + return (lambda query: + combine_queries(xapian.Query.OP_OR, + *[field_processor(query) + for field_processor in field_processors])) + + +def liftover(chain_file: Path, position: ChromosomalPosition) -> Maybe[ChromosomalPosition]: + """Liftover chromosomal position using chain file.""" + # The chain file format is described at + # https://genome.ucsc.edu/goldenPath/help/chain.html + def split_chain_line(line: str) -> tuple[ChromosomalInterval, ChromosomalInterval, int, str]: + (_, _, target_chromosome, _, _, target_start, target_end, + query_chromosome, query_size, query_strand, query_start, query_end, _) = line.split() + return (ChromosomalInterval(target_chromosome.removeprefix("chr"), + int(target_start), int(target_end)), + ChromosomalInterval(query_chromosome.removeprefix("chr"), + int(query_start), int(query_end)), + int(query_size), query_strand) + + with gzip.open(chain_file, "rt") as file: + for line in file: + if line.startswith('chain'): + (target, query, query_size, query_strand) = split_chain_line(line) + if (target.chromosome == position.chromosome + and target.start <= position.position < target.end): + transformed_position = query.start + position.position - target.start + if query_strand == '-': + transformed_position = query_size - 1 - transformed_position + return Just(ChromosomalPosition(query.chromosome, transformed_position)) + return Nothing + + +def liftover_interval(chain_file: str, interval: ChromosomalInterval) -> ChromosomalInterval: + """ + Liftover interval using chain file. + + This function lifts over the interval by merely lifting the start and end + points. This is simplistic and not strictly correct, but will do for the + purposes of synteny in search. + """ + point_liftover = partial(liftover, chain_file) + return (Maybe.apply(curry(2, + lambda start, end: + ChromosomalInterval(start.chromosome, + Just(start.position), + Just(end.position)))) + .to_arguments(interval_start(interval).bind(point_liftover), + interval_end(interval).bind(point_liftover)) + # In '-' strands, the order of the interval may be reversed. Work + # around. + # KLUDGE: Inspecting the value contained in a monad is usually bad + # practice. But, in this case, there doesn't seem to be a way + # around. + .map(lambda interval: ChromosomalInterval(interval.chromosome, + Just(min(interval.start.value, interval.end.value)), + Just(max(interval.start.value, interval.end.value))) + if interval.start.is_just() and interval.end.is_just() + else interval)) + + def parse_range(range_string: str) -> tuple[Maybe[str], Maybe[str]]: """Parse xapian range strings such as start..end.""" start, end = range_string.split("..") @@ -65,8 +153,9 @@ def apply_si_suffix(location: str) -> int: return int(float(location[:-1])*10**suffixes.get(location[-1].lower(), 0)) -def parse_location_field(species: str, species_prefix: str, +def parse_location_field(lifted_species: str, species_prefix: str, chromosome_prefix: str, location_slot: int, + liftover_function: IntervalLiftoverFunction, query: bytes) -> xapian.Query: """Parse location shorthands and return a xapian query. @@ -82,51 +171,130 @@ def parse_location_field(species: str, species_prefix: str, *[location.map(apply_si_suffix) for location in parse_range(location)]) + def make_query(species: str, interval: ChromosomalInterval) -> xapian.Query: + # TODO: Convert the xapian index to use bases instead of megabases. + to_megabases = lambda x: str(float(x)/1e6) + return combine_queries(xapian.Query.OP_AND, + xapian.Query(species_prefix + species), + xapian.Query(chromosome_prefix + interval.chromosome), + xapian.NumberRangeProcessor(location_slot) + (interval.start.maybe("", to_megabases), + interval.end.maybe("", to_megabases))) try: interval = split_query(query.decode("utf-8")) except ValueError: return xapian.Query(xapian.Query.OP_INVALID) - return combine_queries(xapian.Query.OP_AND, - xapian.Query(species_prefix + species), - xapian.Query(chromosome_prefix + interval.chromosome), - xapian.NumberRangeProcessor(location_slot) - (interval.start.maybe("", str), - interval.end.maybe("", str))) + return (liftover_function(interval) + .maybe(xapian.Query.MatchNothing, + partial(make_query, lifted_species))) + + +def parse_synteny_field(synteny_prefix: str, query: bytes) -> xapian.Query: + """Parse synteny field and return a xapian query.""" + if query.decode("utf-8") in ["on", "off"]: + return xapian.Query(synteny_prefix + query.decode("utf-8")) + else: + return xapian.Query(xapian.Query.OP_INVALID) + +def is_synteny_on(synteny_prefix: str, query: xapian.Query) -> bool: + """Check if synteny search is requested in query.""" + return synteny_prefix + "on" in query_terms(query) -def parse_query(query: str): + +def remove_synteny_field(synteny_prefix: str, query: xapian.Query, + parent_operator: int = xapian.Query.OP_AND) -> xapian.Query: + """Return a new query with the synteny field removed.""" + # Note that this function only supports queries that exclusively use the + # AND, OR, FILTER, RANGE and INVALID operators. + # python xapian bindings do not expose xapian.Query.LEAF_TERM. This is + # most likely a bug. + leaf_type = 100 + # Handle leaf node. + if query.get_type() == leaf_type: + if not any(term.startswith(synteny_prefix) for term in query_terms(query)): + return query + elif parent_operator in [xapian.Query.OP_AND, xapian.Query.OP_FILTER]: + return xapian.Query.MatchAll + elif parent_operator == xapian.Query.OP_OR: + return xapian.Query.MatchNothing + else: + raise ValueError("Unexpected operator in query", query.get_type()) + # Recurse on non-leaf nodes with the AND, OR or FILTER operators as root. + elif query.get_type() in (xapian.Query.OP_AND, xapian.Query.OP_OR, xapian.Query.OP_FILTER): + return combine_queries(query.get_type(), + *[remove_synteny_field(synteny_prefix, subquery, query.get_type()) + for subquery in query_subqueries(query)]) + # Return other supported non-leaf nodes verbatim. + elif query.get_type() in [xapian.Query.OP_VALUE_RANGE, xapian.Query.OP_INVALID]: + return query + # Raise an exception on unsupported non-leaf nodes. + else: + raise ValueError("Unexpected operator in query", query.get_type()) + + +def parse_query(synteny_files_directory: Path, query: str): """Parse search query using GeneNetwork specific field processors.""" - queryparser = xapian.QueryParser() - queryparser.set_stemmer(xapian.Stem("en")) - queryparser.set_stemming_strategy(queryparser.STEM_SOME) - species_prefix = "XS" - chromosome_prefix = "XC" - queryparser.add_boolean_prefix("author", "A") - queryparser.add_boolean_prefix("species", species_prefix) - queryparser.add_boolean_prefix("group", "XG") - queryparser.add_boolean_prefix("tissue", "XI") - queryparser.add_boolean_prefix("dataset", "XDS") - queryparser.add_boolean_prefix("symbol", "XY") - queryparser.add_boolean_prefix("chr", chromosome_prefix) - queryparser.add_boolean_prefix("peakchr", "XPC") - queryparser.add_prefix("description", "XD") - range_prefixes = ["mean", "peak", "mb", "peakmb", "additive", "year"] - for i, prefix in enumerate(range_prefixes): - queryparser.add_rangeprocessor(xapian.NumberRangeProcessor(i, prefix + ":")) - - # Add field processors for location shorthands. - species_shorthands = {"Hs": "human", - "Mm": "mouse", - "Rn": "rat"} - for shorthand, species in species_shorthands.items(): - queryparser.add_boolean_prefix( - shorthand, FieldProcessor(partial(parse_location_field, - species, - species_prefix, - chromosome_prefix, - range_prefixes.index("mb")))) - return queryparser.parse_query(query) + synteny_prefix = "XSYN" + + def make_query_parser(synteny: bool) -> xapian.QueryParser: + queryparser = xapian.QueryParser() + queryparser.set_stemmer(xapian.Stem("en")) + queryparser.set_stemming_strategy(queryparser.STEM_SOME) + species_prefix = "XS" + chromosome_prefix = "XC" + queryparser.add_boolean_prefix("author", "A") + queryparser.add_boolean_prefix("species", species_prefix) + queryparser.add_boolean_prefix("group", "XG") + queryparser.add_boolean_prefix("tissue", "XI") + queryparser.add_boolean_prefix("dataset", "XDS") + queryparser.add_boolean_prefix("symbol", "XY") + queryparser.add_boolean_prefix("chr", chromosome_prefix) + queryparser.add_boolean_prefix("peakchr", "XPC") + queryparser.add_prefix("description", "XD") + queryparser.add_prefix("synteny", FieldProcessor(partial(parse_synteny_field, + synteny_prefix))) + range_prefixes = ["mean", "peak", "mb", "peakmb", "additive", "year"] + for i, prefix in enumerate(range_prefixes): + queryparser.add_rangeprocessor(xapian.NumberRangeProcessor(i, prefix + ":")) + + # Add field processors for location shorthands. + species_shorthands = {"Hs": "human", + "Mm": "mouse"} + for shorthand, species in species_shorthands.items(): + field_processors = [partial(parse_location_field, + species, + species_prefix, + chromosome_prefix, + range_prefixes.index("mb"), + Just)] + # If human and synteny is requested, add liftover. + # With synteny search, we search for the same gene sequences + # across different species. But, the same gene sequences may be + # present in very different chromosomal positions in different + # species. So, we first liftover. + if shorthand == "Hs" and synteny: + chain_files = {"mouse": "hg19ToMm10-chains.over.chain.gz"} + for lifted_species, chain_file in chain_files.items(): + field_processors.append( + partial(parse_location_field, + lifted_species, + species_prefix, + chromosome_prefix, + range_prefixes.index("mb"), + partial(liftover_interval, + synteny_files_directory / chain_file))) + queryparser.add_boolean_prefix( + shorthand, + FieldProcessor(field_processor_or(*field_processors))) + return queryparser + + return remove_synteny_field( + synteny_prefix, + make_query_parser(is_synteny_on(synteny_prefix, + make_query_parser(False).parse_query(query))) + .parse_query(query)) @search.route("/") @@ -143,7 +311,7 @@ def search_results(): if results_per_page > maximum_results_per_page: abort(400, description="Requested too many search results") - query = parse_query(querystring) + query = parse_query(Path(current_app.config["DATA_DIR"]) / "synteny", querystring) traits = [] # pylint: disable=invalid-name with xapian_database(current_app.config["XAPIAN_DB_PATH"]) as db: -- cgit v1.2.3