diff options
Diffstat (limited to 'scripts/index-genenetwork')
-rwxr-xr-x | scripts/index-genenetwork | 251 |
1 files changed, 221 insertions, 30 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 1f649cf..2779abc 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -8,21 +8,26 @@ xapian index. This xapian index is later used in providing search through the web interface. """ - -from collections import deque, namedtuple +from dataclasses import dataclass +from collections import deque, namedtuple, Counter import contextlib +import time +import datetime from functools import partial import itertools import json import logging -from multiprocessing import Lock, Process +from multiprocessing import Lock, Manager, Process, managers import os import pathlib import resource +import re import shutil import sys +import hashlib import tempfile -from typing import Callable, Generator, Iterable, List +from typing import Callable, Dict, Generator, Hashable, Iterable, List +from SPARQLWrapper import SPARQLWrapper, JSON import MySQLdb import click @@ -33,7 +38,10 @@ import xapian from gn3.db_utils import database_connection from gn3.monads import query_sql -DOCUMENTS_PER_CHUNK = 100000 +DOCUMENTS_PER_CHUNK = 100_000 +# Running the script in prod consumers ~1GB per process when handling 100_000 Documents per chunk. +# To prevent running out of RAM, we set this as the upper bound for total concurrent processes +PROCESS_COUNT_LIMIT = 67 SQLQuery = namedtuple("SQLQuery", ["fields", "tables", "where", "offset", "limit"], @@ -122,6 +130,38 @@ phenotypes_query = SQLQuery( SQLTableClause("LEFT JOIN", "Geno", "PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id")]) +WIKI_CACHE_QUERY = """ +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX gnt: <http://genenetwork.org/term/> +PREFIX gnc: <http://genenetwork.org/category/> + +SELECT ?symbolName ?speciesName GROUP_CONCAT(DISTINCT ?comment ; separator=\"\\n\") AS ?comment WHERE { + ?symbol rdfs:comment _:node ; + rdfs:label ?symbolName . +_:node rdf:type gnc:GNWikiEntry ; + gnt:belongsToSpecies ?species ; + rdfs:comment ?comment . +?species gnt:shortName ?speciesName . +} GROUP BY ?speciesName ?symbolName +""" + +RIF_CACHE_QUERY = """ +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX gnt: <http://genenetwork.org/term/> +PREFIX gnc: <http://genenetwork.org/category/> + +SELECT ?symbolName ?speciesName GROUP_CONCAT(DISTINCT ?comment ; separator=\"\\n\") AS ?comment WHERE { + ?symbol rdfs:comment _:node ; + rdfs:label ?symbolName . +_:node rdf:type gnc:NCBIWikiEntry ; + gnt:belongsToSpecies ?species ; + rdfs:comment ?comment . +?species gnt:shortName ?speciesName . +} GROUP BY ?speciesName ?symbolName +""" + def serialize_sql(query: SQLQuery) -> str: """Serialize SQLQuery object to a string.""" @@ -168,6 +208,48 @@ def locked_xapian_writable_database(path: pathlib.Path) -> xapian.WritableDataba db.close() +def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = False): + cache = {} + sparql = SPARQLWrapper(sparql_uri) + sparql.setReturnFormat(JSON) + sparql.setQuery(query) + results = sparql.queryAndConvert() + if not isinstance(results, dict): + raise TypeError(f"Expected results to be a dict but found {type(results)}") + bindings = results["results"]["bindings"] + count: Counter[str] = Counter() + words_regex = re.compile(r"\w+") + for entry in bindings : + x = (entry["speciesName"]["value"], entry["symbolName"]["value"],) + value = entry["comment"]["value"] + value = " ".join(words_regex.findall(value)) # remove punctuation + cache[x] = value + count.update(Counter(value.lower().strip().split())) + + if not remove_common_words: + return cache + + words_to_drop = set() + for word, cnt in count.most_common(1000): + if len(word) < 4 or cnt > 3000: + words_to_drop.add(word) + smaller_cache = {} + for entry, value in cache.items(): + new_value = set(word for word in value.lower().split() if word not in words_to_drop) + smaller_cache[entry] = " ".join(new_value) + return smaller_cache + + +def md5hash_ttl_dir(ttl_dir: pathlib.Path) -> str: + if not ttl_dir.exists(): + return "-1" + ttl_hash = hashlib.new("md5") + for ttl_file in ttl_dir.glob("*.ttl"): + with open(ttl_file, encoding="utf-8") as f_: + ttl_hash.update(f_.read().encode()) + return ttl_hash.hexdigest() + + # pylint: disable=invalid-name def write_document(db: xapian.WritableDatabase, identifier: str, doctype: str, doc: xapian.Document) -> None: @@ -181,15 +263,23 @@ def write_document(db: xapian.WritableDatabase, identifier: str, termgenerator = xapian.TermGenerator() termgenerator.set_stemmer(xapian.Stem("en")) +termgenerator.set_stopper_strategy(xapian.TermGenerator.STOP_ALL) +termgenerator.set_stopper(xapian.SimpleStopper()) def index_text(text: str) -> None: """Index text and increase term position.""" termgenerator.index_text(text) termgenerator.increase_termpos() -# pylint: disable=unnecessary-lambda -index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text) +@curry(3) +def index_from_dictionary(keys: Hashable, prefix: str, dictionary: dict): + entry = dictionary.get(keys) + if not entry: + return + termgenerator.index_text_without_positions(entry, 0, prefix) + +index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text) index_authors = lambda authors: termgenerator.index_text(authors, 0, "A") index_species = lambda species: termgenerator.index_text_without_positions(species, 0, "XS") index_group = lambda group: termgenerator.index_text_without_positions(group, 0, "XG") @@ -206,10 +296,17 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive)) add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year))) + + + # When a child process is forked, it inherits a copy of the memory of # its parent. We use this to pass data retrieved from SQL from parent # to child. Specifically, we use this global variable. -data: Iterable +# This is copy-on-write so make sure child processes don't modify this data +mysql_data: Iterable +rif_cache: Iterable +wiki_cache: Iterable + # We use this lock to ensure that only one process writes its Xapian # index to disk at a time. xapian_lock = Lock() @@ -217,7 +314,7 @@ xapian_lock = Lock() def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: """Index genes data into a Xapian index.""" with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db: - for trait in data: + for trait in mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -230,7 +327,7 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: trait["additive"].bind(partial(add_additive, doc)) # Index free text. - for key in ["description", "tissue", "dataset_fullname"]: + for key in ["description", "tissue", "dataset"]: trait[key].bind(index_text) trait.pop("probe_target_description").bind(index_text) for key in ["name", "symbol", "species", "group"]: @@ -242,11 +339,23 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: trait["species"].bind(index_species) trait["group"].bind(index_group) trait["tissue"].bind(index_tissue) - trait["dataset_fullname"].bind(index_dataset) + trait["dataset"].bind(index_dataset) trait["symbol"].bind(index_symbol) trait["chr"].bind(index_chr) trait["geno_chr"].bind(index_peakchr) + Maybe.apply(index_from_dictionary).to_arguments( + Just((trait["species"].value, trait["symbol"].value)), + Just("XRF"), + Just(rif_cache) + ) + + Maybe.apply(index_from_dictionary).to_arguments( + Just((trait["species"].value, trait["symbol"].value)), + Just("XWK"), + Just(wiki_cache) + ) + doc.set_data(json.dumps(trait.data)) (Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}")) .to_arguments(trait["name"], trait["dataset"]) @@ -257,7 +366,8 @@ def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> """Index phenotypes data into a Xapian index.""" with locked_xapian_writable_database( xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db: - for trait in data: + + for trait in mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -270,7 +380,7 @@ def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> trait["year"].bind(partial(add_year, doc)) # Index free text. - for key in ["description", "authors", "dataset_fullname"]: + for key in ["description", "authors", "dataset"]: trait[key].bind(index_text) for key in ["Abstract", "Title"]: trait.pop(key).bind(index_text) @@ -284,7 +394,7 @@ def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> trait["group"].bind(index_group) trait["authors"].bind(index_authors) trait["geno_chr"].bind(index_peakchr) - trait["dataset_fullname"].bind(index_dataset) + trait["dataset"].bind(index_dataset) # Convert name from integer to string. trait["name"] = trait["name"].map(str) @@ -320,12 +430,16 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator: process.join() -def index_query(index_function: Callable, query: SQLQuery, - xapian_build_directory: pathlib.Path, sql_uri: str, start: int = 0) -> None: +def index_query(index_function: Callable[[pathlib.Path, int], None], query: SQLQuery, + xapian_build_directory: pathlib.Path, sql_uri: str, + sparql_uri: str, start: int = 0) -> None: """Run SQL query, and index its results for Xapian.""" i = start + default_no_of_workers = os.cpu_count() or 1 + no_of_workers = min(default_no_of_workers, PROCESS_COUNT_LIMIT) + try: - with worker_queue() as spawn_worker: + with worker_queue(no_of_workers) as spawn_worker: with database_connection(sql_uri) as conn: for chunk in group(query_sql(conn, serialize_sql( # KLUDGE: MariaDB does not allow an offset @@ -335,9 +449,8 @@ def index_query(index_function: Callable, query: SQLQuery, offset=start*DOCUMENTS_PER_CHUNK)), server_side=True), DOCUMENTS_PER_CHUNK): - # pylint: disable=global-statement - global data - data = chunk + global mysql_data + mysql_data = chunk spawn_worker(index_function, (xapian_build_directory, i)) logging.debug("Spawned worker process on chunk %s", i) i += 1 @@ -347,7 +460,7 @@ def index_query(index_function: Callable, query: SQLQuery, except MySQLdb._exceptions.OperationalError: logging.warning("Reopening connection to recovering from SQL operational error", exc_info=True) - index_query(index_function, query, xapian_build_directory, sql_uri, i) + index_query(index_function, query, xapian_build_directory, sql_uri, sparql_uri, i) @contextlib.contextmanager @@ -357,12 +470,33 @@ def temporary_directory(prefix: str, parent_directory: str) -> Generator: yield pathlib.Path(tmpdirname) +def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: + # We found that compacting 50 files of ~600MB has decent performance + no_of_workers = 20 + file_groupings = 50 + with temporary_directory("parallel_combine", str(combined_index)) as parallel_combine: + parallel_combine.mkdir(parents=True, exist_ok=True) + with worker_queue(no_of_workers) as spawn_worker: + i = 0 + while i < len(indices): + end_index = (i + file_groupings) + files = indices[i:end_index] + last_item_idx = i + len(files) + spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files)) + logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx) + i = end_index + logging.debug("Completed parallel xapian compacts") + xapian_compact(combined_index, list(parallel_combine.iterdir())) + + def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: """Compact and combine several Xapian indices.""" # xapian-compact opens all indices simultaneously. So, raise the limit on # the number of open files. soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard)) + combined_index.mkdir(parents=True, exist_ok=True) + start = time.monotonic() db = xapian.Database() try: for index in indices: @@ -370,32 +504,73 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER) finally: db.close() + logging.debug("Removing databases that were compacted into %s", combined_index.name) + for folder in indices: + shutil.rmtree(folder) + logging.debug("Completed xapian-compact %s; handled %s files in %s minutes", combined_index.name, len(indices), (time.monotonic() - start) / 60) + + +@click.command(help="Verify checksums and return True when the data has been changed.") +@click.argument("xapian_directory") +@click.argument("sql_uri") +@click.argument("sparql_uri") +def is_data_modified(xapian_directory: str, + sql_uri: str, + sparql_uri: str) -> None: + dir_ = pathlib.Path(xapian_directory) + with locked_xapian_writable_database(dir_) as db, database_connection(sql_uri) as conn: + checksums = "-1" + if db.get_metadata('tables'): + checksums = " ".join([ + str(result["Checksum"].value) + for result in query_sql( + conn, + f"CHECKSUM TABLE {', '.join(db.get_metadata('tables').decode().split())}") + ]) + # Return a zero exit status code when the data has changed; + # otherwise exit with a 1 exit status code. + generif = pathlib.Path("/var/lib/data/") + if (db.get_metadata("generif-checksum").decode() == md5hash_ttl_dir(generif) and + db.get_metadata("checksums").decode() == checksums): + sys.exit(1) + sys.exit(0) @click.command(help="Index GeneNetwork data and build Xapian search index in XAPIAN_DIRECTORY.") @click.argument("xapian_directory") @click.argument("sql_uri") +@click.argument("sparql_uri") # pylint: disable=missing-function-docstring -def main(xapian_directory: str, sql_uri: str) -> None: +def create_xapian_index(xapian_directory: str, sql_uri: str, + sparql_uri: str) -> None: logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG"), - format='%(relativeCreated)s: %(levelname)s: %(message)s') + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S %Z') + if not pathlib.Path(xapian_directory).exists(): + pathlib.Path(xapian_directory).mkdir() # Ensure no other build process is running. - if pathlib.Path(xapian_directory).exists(): - logging.error("Build directory %s already exists; " + if any(pathlib.Path(xapian_directory).iterdir()): + logging.error("Build directory %s has build files; " "perhaps another build process is running.", xapian_directory) sys.exit(1) - pathlib.Path(xapian_directory).mkdir() + start_time = time.perf_counter() with temporary_directory("combined", xapian_directory) as combined_index: with temporary_directory("build", xapian_directory) as xapian_build_directory: + global rif_cache + global wiki_cache + logging.info("Building wiki cache") + wiki_cache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY, remove_common_words=True) + logging.info("Building rif cache") + rif_cache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY, remove_common_words=True) logging.info("Indexing genes") - index_query(index_genes, genes_query, xapian_build_directory, sql_uri) + index_query(index_genes, genes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Indexing phenotypes") - index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri) + index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Combining and compacting indices") - xapian_compact(combined_index, list(xapian_build_directory.iterdir())) + parallel_xapian_compact(combined_index, list(xapian_build_directory.iterdir())) logging.info("Writing table checksums into index") with locked_xapian_writable_database(combined_index) as db: # Build a (deduplicated) set of all tables referenced in @@ -409,11 +584,27 @@ def main(xapian_directory: str, sql_uri: str) -> None: ] db.set_metadata("tables", " ".join(tables)) db.set_metadata("checksums", " ".join(checksums)) + logging.info("Writing generif checksums into index") + db.set_metadata( + "generif-checksum", + md5hash_ttl_dir(pathlib.Path("/var/lib/data/")).encode()) for child in combined_index.iterdir(): shutil.move(child, xapian_directory) logging.info("Index built") + end_time = time.perf_counter() + index_time = datetime.timedelta(seconds=end_time - start_time) + logging.info(f"Time to Index: {index_time}") + + +@click.group() +def cli(): + pass + + +cli.add_command(is_data_modified) +cli.add_command(create_xapian_index) if __name__ == "__main__": # pylint: disable=no-value-for-parameter - main() + cli() |