From 8885288dd3004b49d4c0d85ffa8ad8f85775d222 Mon Sep 17 00:00:00 2001 From: John Nduli Date: Wed, 3 Jul 2024 10:55:27 +0300 Subject: feat: drop intermediate folders when running parallel xapian compact --- scripts/index-genenetwork | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'scripts') diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 90e7a80..65f0185 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -38,7 +38,7 @@ from gn3.db_utils import database_connection from gn3.monads import query_sql DOCUMENTS_PER_CHUNK = 100_000 -# Running the script in prod consumers ~1GB per process. +# Running the script in prod consumers ~1GB per process when handling 100_000 Documents per chunk. # To prevent running out of RAM, we set this as the upper bound for total concurrent processes PROCESS_COUNT_LIMIT = 67 @@ -216,7 +216,7 @@ def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = Fal if not isinstance(results, dict): raise TypeError(f"Expected results to be a dict but found {type(results)}") bindings = results["results"]["bindings"] - count = Counter() + count: Counter[str] = Counter() words_regex = re.compile(r"\w+") for entry in bindings : x = (entry["speciesName"]["value"], entry["symbolName"]["value"],) @@ -234,11 +234,7 @@ def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = Fal words_to_drop.add(word) smaller_cache = {} for entry, value in cache.items(): - new_value = set() - for word in value.lower().split(): - if word in words_to_drop: - continue - new_value.add(word) + new_value = set(word for word in value.lower().split() if word not in words_to_drop) smaller_cache[entry] = " ".join(new_value) return smaller_cache @@ -454,7 +450,7 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator: process.join() -def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace], None], query: SQLQuery, +def index_query(index_function: Callable[[pathlib.Path, int], None], query: SQLQuery, xapian_build_directory: pathlib.Path, sql_uri: str, sparql_uri: str, start: int = 0) -> None: """Run SQL query, and index its results for Xapian.""" @@ -498,7 +494,7 @@ def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib. # We found that compacting 50 files of ~600MB has decent performance no_of_workers = 20 file_groupings = 50 - with temporary_directory("parallel_combine", combined_index) as parallel_combine: + with temporary_directory("parallel_combine", str(combined_index)) as parallel_combine: parallel_combine.mkdir(parents=True, exist_ok=True) with worker_queue(no_of_workers) as spawn_worker: i = 0 @@ -509,7 +505,7 @@ def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib. spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files)) logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx) i = end_index - logging.debug("Completed spawning parallel xapian compacts") + logging.debug("Completed parallel xapian compacts") xapian_compact(combined_index, list(parallel_combine.iterdir())) @@ -528,7 +524,10 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER) finally: db.close() - logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60) + logging.debug("Removing databases that were compacted into %s", combined_index.name) + for folder in indices: + shutil.rmtree(folder) + logging.debug("Completed xapian-compact %s; handled %s files in %s minutes", combined_index.name, len(indices), (time.monotonic() - start) / 60) @click.command(help="Verify checksums and return True when the data has been changed.") -- cgit v1.2.3