aboutsummaryrefslogtreecommitdiff
path: root/scripts/index-genenetwork
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/index-genenetwork')
-rwxr-xr-xscripts/index-genenetwork374
1 files changed, 374 insertions, 0 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
new file mode 100755
index 0000000..5d231ad
--- /dev/null
+++ b/scripts/index-genenetwork
@@ -0,0 +1,374 @@
+#! /usr/bin/env python3
+
+# pylint: disable=invalid-name
+
+"""This script must be run each time the database is updated. It runs
+queries against the SQL database, indexes the results and builds a
+xapian index. This xapian index is later used in providing search
+through the web interface.
+
+"""
+
+from collections import deque, namedtuple
+import contextlib
+from functools import partial
+import itertools
+import json
+import logging
+from multiprocessing import Lock, Process
+import os
+import pathlib
+import resource
+import shutil
+import tempfile
+
+import MySQLdb
+import click
+from pymonad.maybe import Just, Maybe, Nothing
+from pymonad.tools import curry
+import xapian
+
+from gn3.db_utils import database_connection
+from gn3.monads import query_sql
+
+DOCUMENTS_PER_CHUNK = 100000
+
+SQLQuery = namedtuple("SQLQuery",
+ ["fields", "tables", "where", "offset", "limit"],
+ defaults=[Nothing, 0, Nothing])
+
+# FIXME: Some Max LRS values in the DB are wrongly listed as 0.000,
+# but shouldn't be displayed. Make them NULLs in the database.
+genes_query = SQLQuery(
+ ["ProbeSet.Name AS name",
+ "ProbeSet.Symbol AS symbol",
+ "ProbeSet.description AS description",
+ "ProbeSet.Chr AS chr",
+ "ProbeSet.Mb as mb",
+ "ProbeSet.alias AS alias",
+ "ProbeSet.GenbankId AS genbankid",
+ "ProbeSet.UniGeneId AS unigeneid",
+ "ProbeSet.Probe_Target_Description AS probe_target_description",
+ "ProbeSetFreeze.Name AS dataset",
+ "ProbeSetFreeze.FullName AS dataset_fullname",
+ "Species.Name AS species",
+ "InbredSet.Name AS `group`",
+ "Tissue.Name AS tissue",
+ "ProbeSetXRef.Mean AS mean",
+ "ProbeSetXRef.LRS AS lrs",
+ "ProbeSetXRef.additive AS additive",
+ "Geno.Chr AS geno_chr",
+ "Geno.Mb as geno_mb"],
+ ["Species",
+ "INNER JOIN InbredSet ON InbredSet.SpeciesId = Species.Id",
+ "INNER JOIN ProbeFreeze ON ProbeFreeze.InbredSetId = InbredSet.Id",
+ "INNER JOIN Tissue ON ProbeFreeze.TissueId = Tissue.Id",
+ "INNER JOIN ProbeSetFreeze ON ProbeSetFreeze.ProbeFreezeId = ProbeFreeze.Id",
+ "INNER JOIN ProbeSetXRef ON ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id",
+ "INNER JOIN ProbeSet ON ProbeSet.Id = ProbeSetXRef.ProbeSetId",
+ """LEFT JOIN Geno ON ProbeSetXRef.Locus = Geno.Name
+ AND Geno.SpeciesId = Species.Id"""],
+ Just("ProbeSetFreeze.confidentiality < 1 AND ProbeSetFreeze.public > 0"))
+
+# FIXME: Some years are blank strings or strings that contain text
+# other than the year. These should be fixed in the database and the
+# year field must be made an integer.
+phenotypes_query = SQLQuery(
+ ["Species.Name AS species",
+ "InbredSet.Name AS `group`",
+ "PublishFreeze.Name AS dataset",
+ "PublishFreeze.FullName AS dataset_fullname",
+ "PublishXRef.Id AS name",
+ """COALESCE(Phenotype.Post_publication_abbreviation,
+ Phenotype.Pre_publication_abbreviation)
+ AS abbreviation""",
+ """COALESCE(Phenotype.Post_publication_description,
+ Phenotype.Pre_publication_description)
+ AS description""",
+ "Phenotype.Lab_code",
+ "Publication.Abstract",
+ "Publication.Title",
+ "Publication.Authors AS authors",
+ """IF(CONVERT(Publication.Year, UNSIGNED)=0,
+ NULL, CONVERT(Publication.Year, UNSIGNED)) AS year""",
+ "Publication.PubMed_ID AS pubmed_id",
+ "PublishXRef.LRS as lrs",
+ "PublishXRef.additive",
+ "InbredSet.InbredSetCode AS inbredsetcode",
+ "PublishXRef.mean",
+ "Geno.Chr as geno_chr",
+ "Geno.Mb as geno_mb"],
+ ["Species",
+ "INNER JOIN InbredSet ON InbredSet.SpeciesId = Species.Id",
+ "INNER JOIN PublishFreeze ON PublishFreeze.InbredSetId = InbredSet.Id",
+ "INNER JOIN PublishXRef ON PublishXRef.InbredSetId = InbredSet.Id",
+ "INNER JOIN Phenotype ON PublishXRef.PhenotypeId = Phenotype.Id",
+ "INNER JOIN Publication ON PublishXRef.PublicationId = Publication.Id",
+ "LEFT JOIN Geno ON PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id"])
+
+
+def serialize_sql(query):
+ """Serialize SQLQuery object to a string."""
+ sql = f"SELECT {', '.join(query.fields)} FROM {' '.join(query.tables)}"
+ def append_to_sql(appendee):
+ nonlocal sql
+ sql += appendee
+
+ query.where.bind(lambda where: append_to_sql(f" WHERE {where}"))
+ query.limit.bind(lambda limit: append_to_sql(f" LIMIT {limit}"))
+ if query.offset != 0:
+ sql += f" OFFSET {query.offset}"
+ return sql
+
+
+@contextlib.contextmanager
+def locked_xapian_writable_database(path):
+ """Open xapian database for writing.
+
+ When a process is writing to a xapian database opened by this
+ function, no other process may do so. This avoids I/O contention
+ between processes.
+ """
+ # pylint: disable-next=invalid-name
+ db = xapian.WritableDatabase(str(path))
+ db.begin_transaction()
+ try:
+ yield db
+ except Exception as exception:
+ db.cancel_transaction()
+ raise exception
+ else:
+ xapian_lock.acquire()
+ try:
+ db.commit_transaction()
+ finally:
+ xapian_lock.release()
+ finally:
+ db.close()
+
+
+# pylint: disable=invalid-name
+def write_document(db, identifier, doctype, doc):
+ """Write document into xapian database."""
+ # We use the XT and Q prefixes to indicate the type and idterm
+ # respectively.
+ idterm = f"Q{doctype}:{identifier.lower()}"
+ doc.add_boolean_term(f"XT{doctype}")
+ doc.add_boolean_term(idterm)
+ db.replace_document(idterm, doc)
+
+termgenerator = xapian.TermGenerator()
+termgenerator.set_stemmer(xapian.Stem("en"))
+
+def index_text(text):
+ """Index text and increase term position."""
+ termgenerator.index_text(text)
+ termgenerator.increase_termpos()
+
+# pylint: disable=unnecessary-lambda
+index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text)
+
+index_authors = lambda authors: termgenerator.index_text(authors, 0, "A")
+index_species = lambda species: termgenerator.index_text_without_positions(species, 0, "XS")
+index_group = lambda group: termgenerator.index_text_without_positions(group, 0, "XG")
+index_tissue = lambda tissue: termgenerator.index_text(tissue, 0, "XI")
+index_dataset = lambda dataset: termgenerator.index_text(dataset, 0, "XDS")
+index_symbol = lambda symbol: termgenerator.index_text_without_positions(symbol, 0, "XY")
+index_chr = lambda chr: termgenerator.index_text_without_positions(chr, 0, "XC")
+index_peakchr = lambda peakchr: termgenerator.index_text_without_positions(peakchr, 0, "XPC")
+
+add_mean = lambda doc, mean: doc.add_value(0, xapian.sortable_serialise(mean))
+add_peak = lambda doc, peak: doc.add_value(1, xapian.sortable_serialise(peak))
+add_mb = lambda doc, mb: doc.add_value(2, xapian.sortable_serialise(mb))
+add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peakmb))
+add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive))
+add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year)))
+
+# When a child process is forked, it inherits a copy of the memory of
+# its parent. We use this to pass data retrieved from SQL from parent
+# to child. Specifically, we use this global variable.
+data = None
+# We use this lock to ensure that only one process writes its Xapian
+# index to disk at a time.
+xapian_lock = Lock()
+
+def index_genes(xapian_build_directory, chunk_index):
+ """Index genes data into a Xapian index."""
+ with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db:
+ for trait in data:
+ # pylint: disable=cell-var-from-loop
+ doc = xapian.Document()
+ termgenerator.set_document(doc)
+
+ # Add values.
+ trait["mean"].bind(partial(add_mean, doc))
+ trait["lrs"].bind(partial(add_peak, doc))
+ trait["mb"].bind(partial(add_mb, doc))
+ trait["geno_mb"].bind(partial(add_peakmb, doc))
+ trait["additive"].bind(partial(add_additive, doc))
+
+ # Index free text.
+ for key in ["description", "tissue", "dataset_fullname"]:
+ trait[key].bind(index_text)
+ trait.pop("probe_target_description").bind(index_text)
+ for key in ["name", "symbol", "species", "group"]:
+ trait[key].bind(index_text_without_positions)
+ for key in ["alias", "genbankid", "unigeneid"]:
+ trait.pop(key).bind(index_text_without_positions)
+
+ # Index text with prefixes.
+ trait["species"].bind(index_species)
+ trait["group"].bind(index_group)
+ trait["tissue"].bind(index_tissue)
+ trait["dataset_fullname"].bind(index_dataset)
+ trait["symbol"].bind(index_symbol)
+ trait["chr"].bind(index_chr)
+ trait["geno_chr"].bind(index_peakchr)
+
+ doc.set_data(json.dumps(trait.data))
+ (Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}"))
+ .to_arguments(trait["name"], trait["dataset"])
+ .bind(lambda idterm: write_document(db, idterm, "gene", doc)))
+
+
+def index_phenotypes(xapian_build_directory, chunk_index):
+ """Index phenotypes data into a Xapian index."""
+ with locked_xapian_writable_database(
+ xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db:
+ for trait in data:
+ # pylint: disable=cell-var-from-loop
+ doc = xapian.Document()
+ termgenerator.set_document(doc)
+
+ # Add values.
+ trait["mean"].bind(partial(add_mean, doc))
+ trait["lrs"].bind(partial(add_peak, doc))
+ trait["geno_mb"].bind(partial(add_peakmb, doc))
+ trait["additive"].bind(partial(add_additive, doc))
+ trait["year"].bind(partial(add_year, doc))
+
+ # Index free text.
+ for key in ["description", "authors", "dataset_fullname"]:
+ trait[key].bind(index_text)
+ for key in ["Abstract", "Title"]:
+ trait.pop(key).bind(index_text)
+ for key in ["species", "group", "inbredsetcode"]:
+ trait[key].bind(index_text_without_positions)
+ for key in ["abbreviation", "Lab_code"]:
+ trait.pop(key).bind(index_text_without_positions)
+
+ # Index text with prefixes.
+ trait["species"].bind(index_species)
+ trait["group"].bind(index_group)
+ trait["authors"].bind(index_authors)
+ trait["geno_chr"].bind(index_peakchr)
+ trait["dataset_fullname"].bind(index_dataset)
+
+ # Convert name from integer to string.
+ trait["name"] = trait["name"].map(str)
+ # Split comma-separated authors into a list.
+ trait["authors"] = trait["authors"].map(
+ lambda s: [author.strip() for author in s.split(",")])
+
+ doc.set_data(json.dumps(trait.data))
+ (Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}"))
+ .to_arguments(trait["name"], trait["dataset"])
+ .bind(lambda idterm: write_document(db, idterm, "phenotype", doc)))
+
+
+def group(generator, chunk_size):
+ """Group elements of generator into chunks."""
+ return iter(lambda: tuple(itertools.islice(generator, chunk_size)), ())
+
+
+@contextlib.contextmanager
+def worker_queue(number_of_workers=os.cpu_count()):
+ """Manage a pool of worker processes returning a function to spawn them."""
+ processes = deque()
+
+ def spawn(target, args):
+ if len(processes) == number_of_workers:
+ processes.popleft().join()
+ process = Process(target=target, args=args)
+ process.start()
+ processes.append(process)
+
+ yield spawn
+ for process in processes:
+ process.join()
+
+
+def index_query(index_function, query, xapian_build_directory, start=0):
+ """Run SQL query, and index its results for Xapian."""
+ i = start
+ try:
+ with worker_queue() as spawn_worker:
+ with database_connection() as conn:
+ for chunk in group(query_sql(conn, serialize_sql(
+ # KLUDGE: MariaDB does not allow an offset
+ # without a limit. So, set limit to a "high"
+ # value.
+ query._replace(limit=Just(2**64 - 1),
+ offset=start*DOCUMENTS_PER_CHUNK)),
+ server_side=True),
+ DOCUMENTS_PER_CHUNK):
+ # pylint: disable=global-statement
+ global data
+ data = chunk
+ spawn_worker(index_function, (xapian_build_directory, i))
+ logging.debug("Spawned worker process on chunk %s", i)
+ i += 1
+ # In the event of an operational error, open a new connection and
+ # resume indexing.
+ # pylint: disable=protected-access
+ except MySQLdb._exceptions.OperationalError:
+ logging.warning("Reopening connection to recovering from SQL operational error",
+ exc_info=True)
+ index_query(index_function, query, xapian_build_directory, i)
+
+
+@contextlib.contextmanager
+def temporary_directory(prefix, parent_directory):
+ """Create temporary directory returning it as a PosixPath."""
+ with tempfile.TemporaryDirectory(prefix=prefix, dir=parent_directory) as tmpdirname:
+ yield pathlib.Path(tmpdirname)
+
+
+def xapian_compact(combined_index, indices):
+ """Compact and combine several Xapian indices."""
+ # xapian-compact opens all indices simultaneously. So, raise the limit on
+ # the number of open files.
+ soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
+ resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard))
+ db = xapian.Database()
+ try:
+ for index in indices:
+ db.add_database(xapian.Database(str(index)))
+ db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER)
+ finally:
+ db.close()
+
+
+@click.command(help="Index GeneNetwork data and build Xapian search index in XAPIAN_DIRECTORY.")
+@click.argument("xapian_directory")
+# pylint: disable=missing-function-docstring
+def main(xapian_directory):
+ logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG"),
+ format='%(relativeCreated)s: %(levelname)s: %(message)s')
+ pathlib.Path(xapian_directory).mkdir(exist_ok=True)
+ with temporary_directory("combined", xapian_directory) as combined_index:
+ with temporary_directory("build", xapian_directory) as xapian_build_directory:
+ logging.info("Indexing genes")
+ index_query(index_genes, genes_query, xapian_build_directory)
+ logging.info("Indexing phenotypes")
+ index_query(index_phenotypes, phenotypes_query, xapian_build_directory)
+ logging.info("Combining and compacting indices")
+ xapian_compact(combined_index, list(xapian_build_directory.iterdir()))
+ for child in combined_index.iterdir():
+ shutil.move(child, xapian_directory)
+ logging.info("Index built")
+
+
+if __name__ == "__main__":
+ # pylint: disable=no-value-for-parameter
+ main()