aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/index-genenetwork21
1 files changed, 10 insertions, 11 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 90e7a80..65f0185 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -38,7 +38,7 @@ from gn3.db_utils import database_connection
from gn3.monads import query_sql
DOCUMENTS_PER_CHUNK = 100_000
-# Running the script in prod consumers ~1GB per process.
+# Running the script in prod consumers ~1GB per process when handling 100_000 Documents per chunk.
# To prevent running out of RAM, we set this as the upper bound for total concurrent processes
PROCESS_COUNT_LIMIT = 67
@@ -216,7 +216,7 @@ def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = Fal
if not isinstance(results, dict):
raise TypeError(f"Expected results to be a dict but found {type(results)}")
bindings = results["results"]["bindings"]
- count = Counter()
+ count: Counter[str] = Counter()
words_regex = re.compile(r"\w+")
for entry in bindings :
x = (entry["speciesName"]["value"], entry["symbolName"]["value"],)
@@ -234,11 +234,7 @@ def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = Fal
words_to_drop.add(word)
smaller_cache = {}
for entry, value in cache.items():
- new_value = set()
- for word in value.lower().split():
- if word in words_to_drop:
- continue
- new_value.add(word)
+ new_value = set(word for word in value.lower().split() if word not in words_to_drop)
smaller_cache[entry] = " ".join(new_value)
return smaller_cache
@@ -454,7 +450,7 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator:
process.join()
-def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace], None], query: SQLQuery,
+def index_query(index_function: Callable[[pathlib.Path, int], None], query: SQLQuery,
xapian_build_directory: pathlib.Path, sql_uri: str,
sparql_uri: str, start: int = 0) -> None:
"""Run SQL query, and index its results for Xapian."""
@@ -498,7 +494,7 @@ def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.
# We found that compacting 50 files of ~600MB has decent performance
no_of_workers = 20
file_groupings = 50
- with temporary_directory("parallel_combine", combined_index) as parallel_combine:
+ with temporary_directory("parallel_combine", str(combined_index)) as parallel_combine:
parallel_combine.mkdir(parents=True, exist_ok=True)
with worker_queue(no_of_workers) as spawn_worker:
i = 0
@@ -509,7 +505,7 @@ def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.
spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files))
logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx)
i = end_index
- logging.debug("Completed spawning parallel xapian compacts")
+ logging.debug("Completed parallel xapian compacts")
xapian_compact(combined_index, list(parallel_combine.iterdir()))
@@ -528,7 +524,10 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) ->
db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER)
finally:
db.close()
- logging.debug("Completed xapian-compact for %s files in %s minutes", len(indices), (time.monotonic() - start) / 60)
+ logging.debug("Removing databases that were compacted into %s", combined_index.name)
+ for folder in indices:
+ shutil.rmtree(folder)
+ logging.debug("Completed xapian-compact %s; handled %s files in %s minutes", combined_index.name, len(indices), (time.monotonic() - start) / 60)
@click.command(help="Verify checksums and return True when the data has been changed.")