about summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/index-genenetwork43
1 files changed, 23 insertions, 20 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 61780e5..1f9c13b 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -8,6 +8,7 @@ xapian index. This xapian index is later used in providing search
 through the web interface.
 
 """
+from dataclasses import dataclass
 from collections import deque, namedtuple
 import contextlib
 import time
@@ -16,7 +17,7 @@ from functools import partial
 import itertools
 import json
 import logging
-from multiprocessing import Lock, Process
+from multiprocessing import Lock, Manager, Process, managers
 import os
 import pathlib
 import resource
@@ -251,8 +252,8 @@ def index_text(text: str) -> None:
     termgenerator.increase_termpos()
 
 
-@curry(2)
-def index_rif_comments(species, symbol):
+@curry(3)
+def index_rif_comments(species: str, symbol: str, rdfcache: dict):
     key = (species, symbol,)
     entry = rdfcache.get(key)
     if entry:
@@ -276,19 +277,21 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak
 add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive))
 add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year)))
 
-# When a child process is forked, it inherits a copy of the memory of
-# its parent. We use this to pass data retrieved from SQL from parent
-# to child. Specifically, we use this global variable.
-data: Iterable
-rdfcache: Iterable
+# class that contains data that will be shared across multiple processes
+@dataclass
+class ProcessSharedData:
+    mysql_data: Iterable
+    rif_cache: Iterable
+
 # We use this lock to ensure that only one process writes its Xapian
 # index to disk at a time.
 xapian_lock = Lock()
 
-def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
+def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace) -> None:
     """Index genes data into a Xapian index."""
     with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db:
-        for trait in data:
+        share: ProcessSharedData = namespace.shared
+        for trait in share.mysql_data:
             # pylint: disable=cell-var-from-loop
             doc = xapian.Document()
             termgenerator.set_document(doc)
@@ -321,7 +324,7 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
             Maybe.apply(
                 index_rif_comments
             ).to_arguments(
-                trait["species"], trait["symbol"]
+                trait["species"], trait["symbol"], Just(share.rif_cache)
             )
 
             doc.set_data(json.dumps(trait.data))
@@ -330,11 +333,13 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
              .bind(lambda idterm: write_document(db, idterm, "gene", doc)))
 
 
-def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
+def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace ) -> None:
     """Index phenotypes data into a Xapian index."""
     with locked_xapian_writable_database(
             xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db:
-        for trait in data:
+
+        share: ProcessSharedData = namespace.shared
+        for trait in share.mysql_data:
             # pylint: disable=cell-var-from-loop
             doc = xapian.Document()
             termgenerator.set_document(doc)
@@ -397,14 +402,13 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator:
         process.join()
 
 
-def index_query(index_function: Callable, query: SQLQuery,
+def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace], None], query: SQLQuery,
                 xapian_build_directory: pathlib.Path, sql_uri: str,
                 sparql_uri: str, start: int = 0) -> None:
     """Run SQL query, and index its results for Xapian."""
     i = start
     try:
-        with worker_queue() as spawn_worker:
-            global rdfcache
+        with Manager() as manager, worker_queue() as spawn_worker:
             rdfcache = build_rif_cache(sparql_uri)
             with database_connection(sql_uri) as conn:
                 for chunk in group(query_sql(conn, serialize_sql(
@@ -415,10 +419,9 @@ def index_query(index_function: Callable, query: SQLQuery,
                                        offset=start*DOCUMENTS_PER_CHUNK)),
                                                    server_side=True),
                                    DOCUMENTS_PER_CHUNK):
-                    # pylint: disable=global-statement
-                    global data
-                    data = chunk
-                    spawn_worker(index_function, (xapian_build_directory, i))
+                    namespace = manager.Namespace()
+                    namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache)
+                    spawn_worker(index_function, (xapian_build_directory, i, namespace))
                     logging.debug("Spawned worker process on chunk %s", i)
                     i += 1
     # In the event of an operational error, open a new connection and