diff options
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/index-genenetwork | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork new file mode 100755 index 0000000..5d231ad --- /dev/null +++ b/scripts/index-genenetwork @@ -0,0 +1,374 @@ +#! /usr/bin/env python3 + +# pylint: disable=invalid-name + +"""This script must be run each time the database is updated. It runs +queries against the SQL database, indexes the results and builds a +xapian index. This xapian index is later used in providing search +through the web interface. + +""" + +from collections import deque, namedtuple +import contextlib +from functools import partial +import itertools +import json +import logging +from multiprocessing import Lock, Process +import os +import pathlib +import resource +import shutil +import tempfile + +import MySQLdb +import click +from pymonad.maybe import Just, Maybe, Nothing +from pymonad.tools import curry +import xapian + +from gn3.db_utils import database_connection +from gn3.monads import query_sql + +DOCUMENTS_PER_CHUNK = 100000 + +SQLQuery = namedtuple("SQLQuery", + ["fields", "tables", "where", "offset", "limit"], + defaults=[Nothing, 0, Nothing]) + +# FIXME: Some Max LRS values in the DB are wrongly listed as 0.000, +# but shouldn't be displayed. Make them NULLs in the database. +genes_query = SQLQuery( + ["ProbeSet.Name AS name", + "ProbeSet.Symbol AS symbol", + "ProbeSet.description AS description", + "ProbeSet.Chr AS chr", + "ProbeSet.Mb as mb", + "ProbeSet.alias AS alias", + "ProbeSet.GenbankId AS genbankid", + "ProbeSet.UniGeneId AS unigeneid", + "ProbeSet.Probe_Target_Description AS probe_target_description", + "ProbeSetFreeze.Name AS dataset", + "ProbeSetFreeze.FullName AS dataset_fullname", + "Species.Name AS species", + "InbredSet.Name AS `group`", + "Tissue.Name AS tissue", + "ProbeSetXRef.Mean AS mean", + "ProbeSetXRef.LRS AS lrs", + "ProbeSetXRef.additive AS additive", + "Geno.Chr AS geno_chr", + "Geno.Mb as geno_mb"], + ["Species", + "INNER JOIN InbredSet ON InbredSet.SpeciesId = Species.Id", + "INNER JOIN ProbeFreeze ON ProbeFreeze.InbredSetId = InbredSet.Id", + "INNER JOIN Tissue ON ProbeFreeze.TissueId = Tissue.Id", + "INNER JOIN ProbeSetFreeze ON ProbeSetFreeze.ProbeFreezeId = ProbeFreeze.Id", + "INNER JOIN ProbeSetXRef ON ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id", + "INNER JOIN ProbeSet ON ProbeSet.Id = ProbeSetXRef.ProbeSetId", + """LEFT JOIN Geno ON ProbeSetXRef.Locus = Geno.Name + AND Geno.SpeciesId = Species.Id"""], + Just("ProbeSetFreeze.confidentiality < 1 AND ProbeSetFreeze.public > 0")) + +# FIXME: Some years are blank strings or strings that contain text +# other than the year. These should be fixed in the database and the +# year field must be made an integer. +phenotypes_query = SQLQuery( + ["Species.Name AS species", + "InbredSet.Name AS `group`", + "PublishFreeze.Name AS dataset", + "PublishFreeze.FullName AS dataset_fullname", + "PublishXRef.Id AS name", + """COALESCE(Phenotype.Post_publication_abbreviation, + Phenotype.Pre_publication_abbreviation) + AS abbreviation""", + """COALESCE(Phenotype.Post_publication_description, + Phenotype.Pre_publication_description) + AS description""", + "Phenotype.Lab_code", + "Publication.Abstract", + "Publication.Title", + "Publication.Authors AS authors", + """IF(CONVERT(Publication.Year, UNSIGNED)=0, + NULL, CONVERT(Publication.Year, UNSIGNED)) AS year""", + "Publication.PubMed_ID AS pubmed_id", + "PublishXRef.LRS as lrs", + "PublishXRef.additive", + "InbredSet.InbredSetCode AS inbredsetcode", + "PublishXRef.mean", + "Geno.Chr as geno_chr", + "Geno.Mb as geno_mb"], + ["Species", + "INNER JOIN InbredSet ON InbredSet.SpeciesId = Species.Id", + "INNER JOIN PublishFreeze ON PublishFreeze.InbredSetId = InbredSet.Id", + "INNER JOIN PublishXRef ON PublishXRef.InbredSetId = InbredSet.Id", + "INNER JOIN Phenotype ON PublishXRef.PhenotypeId = Phenotype.Id", + "INNER JOIN Publication ON PublishXRef.PublicationId = Publication.Id", + "LEFT JOIN Geno ON PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id"]) + + +def serialize_sql(query): + """Serialize SQLQuery object to a string.""" + sql = f"SELECT {', '.join(query.fields)} FROM {' '.join(query.tables)}" + def append_to_sql(appendee): + nonlocal sql + sql += appendee + + query.where.bind(lambda where: append_to_sql(f" WHERE {where}")) + query.limit.bind(lambda limit: append_to_sql(f" LIMIT {limit}")) + if query.offset != 0: + sql += f" OFFSET {query.offset}" + return sql + + +@contextlib.contextmanager +def locked_xapian_writable_database(path): + """Open xapian database for writing. + + When a process is writing to a xapian database opened by this + function, no other process may do so. This avoids I/O contention + between processes. + """ + # pylint: disable-next=invalid-name + db = xapian.WritableDatabase(str(path)) + db.begin_transaction() + try: + yield db + except Exception as exception: + db.cancel_transaction() + raise exception + else: + xapian_lock.acquire() + try: + db.commit_transaction() + finally: + xapian_lock.release() + finally: + db.close() + + +# pylint: disable=invalid-name +def write_document(db, identifier, doctype, doc): + """Write document into xapian database.""" + # We use the XT and Q prefixes to indicate the type and idterm + # respectively. + idterm = f"Q{doctype}:{identifier.lower()}" + doc.add_boolean_term(f"XT{doctype}") + doc.add_boolean_term(idterm) + db.replace_document(idterm, doc) + +termgenerator = xapian.TermGenerator() +termgenerator.set_stemmer(xapian.Stem("en")) + +def index_text(text): + """Index text and increase term position.""" + termgenerator.index_text(text) + termgenerator.increase_termpos() + +# pylint: disable=unnecessary-lambda +index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text) + +index_authors = lambda authors: termgenerator.index_text(authors, 0, "A") +index_species = lambda species: termgenerator.index_text_without_positions(species, 0, "XS") +index_group = lambda group: termgenerator.index_text_without_positions(group, 0, "XG") +index_tissue = lambda tissue: termgenerator.index_text(tissue, 0, "XI") +index_dataset = lambda dataset: termgenerator.index_text(dataset, 0, "XDS") +index_symbol = lambda symbol: termgenerator.index_text_without_positions(symbol, 0, "XY") +index_chr = lambda chr: termgenerator.index_text_without_positions(chr, 0, "XC") +index_peakchr = lambda peakchr: termgenerator.index_text_without_positions(peakchr, 0, "XPC") + +add_mean = lambda doc, mean: doc.add_value(0, xapian.sortable_serialise(mean)) +add_peak = lambda doc, peak: doc.add_value(1, xapian.sortable_serialise(peak)) +add_mb = lambda doc, mb: doc.add_value(2, xapian.sortable_serialise(mb)) +add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peakmb)) +add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive)) +add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year))) + +# When a child process is forked, it inherits a copy of the memory of +# its parent. We use this to pass data retrieved from SQL from parent +# to child. Specifically, we use this global variable. +data = None +# We use this lock to ensure that only one process writes its Xapian +# index to disk at a time. +xapian_lock = Lock() + +def index_genes(xapian_build_directory, chunk_index): + """Index genes data into a Xapian index.""" + with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db: + for trait in data: + # pylint: disable=cell-var-from-loop + doc = xapian.Document() + termgenerator.set_document(doc) + + # Add values. + trait["mean"].bind(partial(add_mean, doc)) + trait["lrs"].bind(partial(add_peak, doc)) + trait["mb"].bind(partial(add_mb, doc)) + trait["geno_mb"].bind(partial(add_peakmb, doc)) + trait["additive"].bind(partial(add_additive, doc)) + + # Index free text. + for key in ["description", "tissue", "dataset_fullname"]: + trait[key].bind(index_text) + trait.pop("probe_target_description").bind(index_text) + for key in ["name", "symbol", "species", "group"]: + trait[key].bind(index_text_without_positions) + for key in ["alias", "genbankid", "unigeneid"]: + trait.pop(key).bind(index_text_without_positions) + + # Index text with prefixes. + trait["species"].bind(index_species) + trait["group"].bind(index_group) + trait["tissue"].bind(index_tissue) + trait["dataset_fullname"].bind(index_dataset) + trait["symbol"].bind(index_symbol) + trait["chr"].bind(index_chr) + trait["geno_chr"].bind(index_peakchr) + + doc.set_data(json.dumps(trait.data)) + (Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}")) + .to_arguments(trait["name"], trait["dataset"]) + .bind(lambda idterm: write_document(db, idterm, "gene", doc))) + + +def index_phenotypes(xapian_build_directory, chunk_index): + """Index phenotypes data into a Xapian index.""" + with locked_xapian_writable_database( + xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db: + for trait in data: + # pylint: disable=cell-var-from-loop + doc = xapian.Document() + termgenerator.set_document(doc) + + # Add values. + trait["mean"].bind(partial(add_mean, doc)) + trait["lrs"].bind(partial(add_peak, doc)) + trait["geno_mb"].bind(partial(add_peakmb, doc)) + trait["additive"].bind(partial(add_additive, doc)) + trait["year"].bind(partial(add_year, doc)) + + # Index free text. + for key in ["description", "authors", "dataset_fullname"]: + trait[key].bind(index_text) + for key in ["Abstract", "Title"]: + trait.pop(key).bind(index_text) + for key in ["species", "group", "inbredsetcode"]: + trait[key].bind(index_text_without_positions) + for key in ["abbreviation", "Lab_code"]: + trait.pop(key).bind(index_text_without_positions) + + # Index text with prefixes. + trait["species"].bind(index_species) + trait["group"].bind(index_group) + trait["authors"].bind(index_authors) + trait["geno_chr"].bind(index_peakchr) + trait["dataset_fullname"].bind(index_dataset) + + # Convert name from integer to string. + trait["name"] = trait["name"].map(str) + # Split comma-separated authors into a list. + trait["authors"] = trait["authors"].map( + lambda s: [author.strip() for author in s.split(",")]) + + doc.set_data(json.dumps(trait.data)) + (Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}")) + .to_arguments(trait["name"], trait["dataset"]) + .bind(lambda idterm: write_document(db, idterm, "phenotype", doc))) + + +def group(generator, chunk_size): + """Group elements of generator into chunks.""" + return iter(lambda: tuple(itertools.islice(generator, chunk_size)), ()) + + +@contextlib.contextmanager +def worker_queue(number_of_workers=os.cpu_count()): + """Manage a pool of worker processes returning a function to spawn them.""" + processes = deque() + + def spawn(target, args): + if len(processes) == number_of_workers: + processes.popleft().join() + process = Process(target=target, args=args) + process.start() + processes.append(process) + + yield spawn + for process in processes: + process.join() + + +def index_query(index_function, query, xapian_build_directory, start=0): + """Run SQL query, and index its results for Xapian.""" + i = start + try: + with worker_queue() as spawn_worker: + with database_connection() as conn: + for chunk in group(query_sql(conn, serialize_sql( + # KLUDGE: MariaDB does not allow an offset + # without a limit. So, set limit to a "high" + # value. + query._replace(limit=Just(2**64 - 1), + offset=start*DOCUMENTS_PER_CHUNK)), + server_side=True), + DOCUMENTS_PER_CHUNK): + # pylint: disable=global-statement + global data + data = chunk + spawn_worker(index_function, (xapian_build_directory, i)) + logging.debug("Spawned worker process on chunk %s", i) + i += 1 + # In the event of an operational error, open a new connection and + # resume indexing. + # pylint: disable=protected-access + except MySQLdb._exceptions.OperationalError: + logging.warning("Reopening connection to recovering from SQL operational error", + exc_info=True) + index_query(index_function, query, xapian_build_directory, i) + + +@contextlib.contextmanager +def temporary_directory(prefix, parent_directory): + """Create temporary directory returning it as a PosixPath.""" + with tempfile.TemporaryDirectory(prefix=prefix, dir=parent_directory) as tmpdirname: + yield pathlib.Path(tmpdirname) + + +def xapian_compact(combined_index, indices): + """Compact and combine several Xapian indices.""" + # xapian-compact opens all indices simultaneously. So, raise the limit on + # the number of open files. + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard)) + db = xapian.Database() + try: + for index in indices: + db.add_database(xapian.Database(str(index))) + db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER) + finally: + db.close() + + +@click.command(help="Index GeneNetwork data and build Xapian search index in XAPIAN_DIRECTORY.") +@click.argument("xapian_directory") +# pylint: disable=missing-function-docstring +def main(xapian_directory): + logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG"), + format='%(relativeCreated)s: %(levelname)s: %(message)s') + pathlib.Path(xapian_directory).mkdir(exist_ok=True) + with temporary_directory("combined", xapian_directory) as combined_index: + with temporary_directory("build", xapian_directory) as xapian_build_directory: + logging.info("Indexing genes") + index_query(index_genes, genes_query, xapian_build_directory) + logging.info("Indexing phenotypes") + index_query(index_phenotypes, phenotypes_query, xapian_build_directory) + logging.info("Combining and compacting indices") + xapian_compact(combined_index, list(xapian_build_directory.iterdir())) + for child in combined_index.iterdir(): + shutil.move(child, xapian_directory) + logging.info("Index built") + + +if __name__ == "__main__": + # pylint: disable=no-value-for-parameter + main() |