about summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/index-genenetwork21
1 files changed, 10 insertions, 11 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 90e7a80..65f0185 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -38,7 +38,7 @@ from gn3.db_utils import database_connection
 from gn3.monads import query_sql
 
 DOCUMENTS_PER_CHUNK = 100_000
-# Running the script in prod consumers ~1GB per process.
+# Running the script in prod consumers ~1GB per process when handling 100_000 Documents per chunk.
 # To prevent running out of RAM, we set this as the upper bound for total concurrent processes
 PROCESS_COUNT_LIMIT = 67
 
@@ -216,7 +216,7 @@ def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = Fal
     if not isinstance(results, dict):
         raise TypeError(f"Expected results to be a dict but found {type(results)}")
     bindings = results["results"]["bindings"]
-    count = Counter()
+    count: Counter[str] = Counter()
     words_regex = re.compile(r"\w+")
     for entry in bindings :
         x = (entry["speciesName"]["value"], entry["symbolName"]["value"],)
@@ -234,11 +234,7 @@ def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = Fal
             words_to_drop.add(word)
     smaller_cache = {}
     for entry, value in cache.items():
-        new_value = set()
-        for word in value.lower().split():
-            if word in words_to_drop:
-                continue
-            new_value.add(word)
+        new_value = set(word for word in value.lower().split() if word not in words_to_drop)
         smaller_cache[entry] = " ".join(new_value)
     return smaller_cache
 
@@ -454,7 +450,7 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator:
         process.join()
 
 
-def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace], None], query: SQLQuery,
+def index_query(index_function: Callable[[pathlib.Path, int], None], query: SQLQuery,
                 xapian_build_directory: pathlib.Path, sql_uri: str,
                 sparql_uri: str, start: int = 0) -> None:
     """Run SQL query, and index its results for Xapian."""
@@ -498,7 +494,7 @@ def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.
     # We found that compacting 50 files of ~600MB has decent performance
     no_of_workers = 20
     file_groupings = 50
-    with temporary_directory("parallel_combine", combined_index) as parallel_combine:
+    with temporary_directory("parallel_combine", str(combined_index)) as parallel_combine:
         parallel_combine.mkdir(parents=True, exist_ok=True)
         with worker_queue(no_of_workers) as spawn_worker:
             i = 0
@@ -509,7 +505,7 @@ def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.
                 spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files))
                 logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx)
                 i = end_index
-        logging.debug("Completed spawning parallel xapian compacts")
+        logging.debug("Completed parallel xapian compacts")
         xapian_compact(combined_index, list(parallel_combine.iterdir()))
 
 
@@ -528,7 +524,10 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) ->
         db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER)
     finally:
         db.close()
-    logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60)
+    logging.debug("Removing databases that were compacted into %s", combined_index.name)
+    for folder in indices:
+        shutil.rmtree(folder)
+    logging.debug("Completed xapian-compact %s; handled %s files in %s minutes", combined_index.name, len(indices), (time.monotonic() - start) / 60)
 
 
 @click.command(help="Verify checksums and return True when the data has been changed.")