about summary refs log tree commit diff
path: root/scripts/index-genenetwork
diff options
context:
space:
mode:
authorJohn Nduli2024-07-02 21:29:30 +0300
committerBonfaceKilz2024-07-03 14:24:01 +0300
commit0c651326b123c601989dd18c9d690b7a9e400189 (patch)
tree6cdf710d7e0d6ac1d226de06b41866be819a455d /scripts/index-genenetwork
parent012a70920e19528543608faf575392119a4babf7 (diff)
downloadgenenetwork3-0c651326b123c601989dd18c9d690b7a9e400189.tar.gz
feat: add support for parallel xapian compact
Diffstat (limited to 'scripts/index-genenetwork')
-rwxr-xr-xscripts/index-genenetwork24
1 files changed, 23 insertions, 1 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 57cd1b4..90e7a80 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -494,12 +494,33 @@ def temporary_directory(prefix: str, parent_directory: str) -> Generator:
         yield pathlib.Path(tmpdirname)
 
 
+def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None:
+    # We found that compacting 50 files of ~600MB has decent performance
+    no_of_workers = 20
+    file_groupings = 50
+    with temporary_directory("parallel_combine", combined_index) as parallel_combine:
+        parallel_combine.mkdir(parents=True, exist_ok=True)
+        with worker_queue(no_of_workers) as spawn_worker:
+            i = 0
+            while i < len(indices):
+                end_index = (i + file_groupings)
+                files = indices[i:end_index]
+                last_item_idx = i + len(files)
+                spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files))
+                logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx)
+                i = end_index
+        logging.debug("Completed spawning parallel xapian compacts")
+        xapian_compact(combined_index, list(parallel_combine.iterdir()))
+
+
 def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None:
     """Compact and combine several Xapian indices."""
     # xapian-compact opens all indices simultaneously. So, raise the limit on
     # the number of open files.
     soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
     resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard))
+    combined_index.mkdir(parents=True, exist_ok=True)
+    start = time.monotonic()
     db = xapian.Database()
     try:
         for index in indices:
@@ -507,6 +528,7 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) ->
         db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER)
     finally:
         db.close()
+    logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60)
 
 
 @click.command(help="Verify checksums and return True when the data has been changed.")
@@ -568,7 +590,7 @@ def create_xapian_index(xapian_directory: str, sql_uri: str,
             logging.info("Indexing phenotypes")
             index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri, sparql_uri)
             logging.info("Combining and compacting indices")
-            xapian_compact(combined_index, list(xapian_build_directory.iterdir()))
+            parallel_xapian_compact(combined_index, list(xapian_build_directory.iterdir()))
             logging.info("Writing table checksums into index")
             with locked_xapian_writable_database(combined_index) as db:
                 # Build a (deduplicated) set of all tables referenced in