aboutsummaryrefslogtreecommitdiff
path: root/scripts/index-genenetwork
diff options
context:
space:
mode:
authorJohn Nduli2024-07-02 21:29:30 +0300
committerBonfaceKilz2024-07-03 14:24:01 +0300
commit0c651326b123c601989dd18c9d690b7a9e400189 (patch)
tree6cdf710d7e0d6ac1d226de06b41866be819a455d /scripts/index-genenetwork
parent012a70920e19528543608faf575392119a4babf7 (diff)
downloadgenenetwork3-0c651326b123c601989dd18c9d690b7a9e400189.tar.gz
feat: add support for parallel xapian compact
Diffstat (limited to 'scripts/index-genenetwork')
-rwxr-xr-xscripts/index-genenetwork24
1 files changed, 23 insertions, 1 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 57cd1b4..90e7a80 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -494,12 +494,33 @@ def temporary_directory(prefix: str, parent_directory: str) -> Generator:
yield pathlib.Path(tmpdirname)
+def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None:
+ # We found that compacting 50 files of ~600MB has decent performance
+ no_of_workers = 20
+ file_groupings = 50
+ with temporary_directory("parallel_combine", combined_index) as parallel_combine:
+ parallel_combine.mkdir(parents=True, exist_ok=True)
+ with worker_queue(no_of_workers) as spawn_worker:
+ i = 0
+ while i < len(indices):
+ end_index = (i + file_groupings)
+ files = indices[i:end_index]
+ last_item_idx = i + len(files)
+ spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files))
+ logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx)
+ i = end_index
+ logging.debug("Completed spawning parallel xapian compacts")
+ xapian_compact(combined_index, list(parallel_combine.iterdir()))
+
+
def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None:
"""Compact and combine several Xapian indices."""
# xapian-compact opens all indices simultaneously. So, raise the limit on
# the number of open files.
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard))
+ combined_index.mkdir(parents=True, exist_ok=True)
+ start = time.monotonic()
db = xapian.Database()
try:
for index in indices:
@@ -507,6 +528,7 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) ->
db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER)
finally:
db.close()
+ logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60)
@click.command(help="Verify checksums and return True when the data has been changed.")
@@ -568,7 +590,7 @@ def create_xapian_index(xapian_directory: str, sql_uri: str,
logging.info("Indexing phenotypes")
index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri, sparql_uri)
logging.info("Combining and compacting indices")
- xapian_compact(combined_index, list(xapian_build_directory.iterdir()))
+ parallel_xapian_compact(combined_index, list(xapian_build_directory.iterdir()))
logging.info("Writing table checksums into index")
with locked_xapian_writable_database(combined_index) as db:
# Build a (deduplicated) set of all tables referenced in