From 0c651326b123c601989dd18c9d690b7a9e400189 Mon Sep 17 00:00:00 2001 From: John Nduli Date: Tue, 2 Jul 2024 21:29:30 +0300 Subject: feat: add support for parallel xapian compact --- scripts/index-genenetwork | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'scripts/index-genenetwork') diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 57cd1b4..90e7a80 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -494,12 +494,33 @@ def temporary_directory(prefix: str, parent_directory: str) -> Generator: yield pathlib.Path(tmpdirname) +def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: + # We found that compacting 50 files of ~600MB has decent performance + no_of_workers = 20 + file_groupings = 50 + with temporary_directory("parallel_combine", combined_index) as parallel_combine: + parallel_combine.mkdir(parents=True, exist_ok=True) + with worker_queue(no_of_workers) as spawn_worker: + i = 0 + while i < len(indices): + end_index = (i + file_groupings) + files = indices[i:end_index] + last_item_idx = i + len(files) + spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files)) + logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx) + i = end_index + logging.debug("Completed spawning parallel xapian compacts") + xapian_compact(combined_index, list(parallel_combine.iterdir())) + + def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: """Compact and combine several Xapian indices.""" # xapian-compact opens all indices simultaneously. So, raise the limit on # the number of open files. soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard)) + combined_index.mkdir(parents=True, exist_ok=True) + start = time.monotonic() db = xapian.Database() try: for index in indices: @@ -507,6 +528,7 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER) finally: db.close() + logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60) @click.command(help="Verify checksums and return True when the data has been changed.") @@ -568,7 +590,7 @@ def create_xapian_index(xapian_directory: str, sql_uri: str, logging.info("Indexing phenotypes") index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Combining and compacting indices") - xapian_compact(combined_index, list(xapian_build_directory.iterdir())) + parallel_xapian_compact(combined_index, list(xapian_build_directory.iterdir())) logging.info("Writing table checksums into index") with locked_xapian_writable_database(combined_index) as db: # Build a (deduplicated) set of all tables referenced in -- cgit v1.2.3