diff options
author | John Nduli | 2024-07-02 21:29:30 +0300 |
---|---|---|
committer | BonfaceKilz | 2024-07-03 14:24:01 +0300 |
commit | 0c651326b123c601989dd18c9d690b7a9e400189 (patch) | |
tree | 6cdf710d7e0d6ac1d226de06b41866be819a455d /scripts | |
parent | 012a70920e19528543608faf575392119a4babf7 (diff) | |
download | genenetwork3-0c651326b123c601989dd18c9d690b7a9e400189.tar.gz |
feat: add support for parallel xapian compact
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/index-genenetwork | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 57cd1b4..90e7a80 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -494,12 +494,33 @@ def temporary_directory(prefix: str, parent_directory: str) -> Generator: yield pathlib.Path(tmpdirname) +def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: + # We found that compacting 50 files of ~600MB has decent performance + no_of_workers = 20 + file_groupings = 50 + with temporary_directory("parallel_combine", combined_index) as parallel_combine: + parallel_combine.mkdir(parents=True, exist_ok=True) + with worker_queue(no_of_workers) as spawn_worker: + i = 0 + while i < len(indices): + end_index = (i + file_groupings) + files = indices[i:end_index] + last_item_idx = i + len(files) + spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files)) + logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx) + i = end_index + logging.debug("Completed spawning parallel xapian compacts") + xapian_compact(combined_index, list(parallel_combine.iterdir())) + + def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: """Compact and combine several Xapian indices.""" # xapian-compact opens all indices simultaneously. So, raise the limit on # the number of open files. soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard)) + combined_index.mkdir(parents=True, exist_ok=True) + start = time.monotonic() db = xapian.Database() try: for index in indices: @@ -507,6 +528,7 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER) finally: db.close() + logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60) @click.command(help="Verify checksums and return True when the data has been changed.") @@ -568,7 +590,7 @@ def create_xapian_index(xapian_directory: str, sql_uri: str, logging.info("Indexing phenotypes") index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Combining and compacting indices") - xapian_compact(combined_index, list(xapian_build_directory.iterdir())) + parallel_xapian_compact(combined_index, list(xapian_build_directory.iterdir())) logging.info("Writing table checksums into index") with locked_xapian_writable_database(combined_index) as db: # Build a (deduplicated) set of all tables referenced in |