about summary refs log tree commit diff
diff options
context:
space:
mode:
authorJohn Nduli2024-06-25 14:16:11 +0300
committerBonfaceKilz2024-07-03 14:24:01 +0300
commit621be7a4162b186687a10b8227dcc50038703f0a (patch)
tree427bbdc3b107732723a8f8cd3857ddd25bea8b48
parentdee508d46c916caa5080da87a264347479576dd2 (diff)
downloadgenenetwork3-621be7a4162b186687a10b8227dcc50038703f0a.tar.gz
fix: remove namespaces since child processes copy the rdf caches
-rwxr-xr-xscripts/index-genenetwork45
1 files changed, 25 insertions, 20 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 5c22b3b..5bdf44f 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -293,22 +293,25 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak
 add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive))
 add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year)))
 
-# class that contains data that will be shared across multiple processes
-@dataclass
-class ProcessSharedData:
-    mysql_data: Iterable
-    rif_cache: Iterable
-    wiki_cache: Iterable
+
+
+
+# When a child process is forked, it inherits a copy of the memory of
+# its parent. We use this to pass data retrieved from SQL from parent
+# to child. Specifically, we use this global variable.
+# This is copy-on-write so make sure child processes don't modify this data
+mysql_data: Iterable
+rif_cache: Iterable
+wiki_cache: Iterable
 
 # We use this lock to ensure that only one process writes its Xapian
 # index to disk at a time.
 xapian_lock = Lock()
 
-def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace) -> None:
+def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
     """Index genes data into a Xapian index."""
     with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db:
-        share: ProcessSharedData = namespace.shared
-        for trait in share.mysql_data:
+        for trait in mysql_data:
             # pylint: disable=cell-var-from-loop
             doc = xapian.Document()
             termgenerator.set_document(doc)
@@ -341,13 +344,13 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac
             Maybe.apply(index_from_dictionary).to_arguments(
                     Just((trait["species"].value, trait["symbol"].value)),
                     Just("XRF"),
-                    Just(share.rif_cache)
+                    Just(rif_cache)
                     )
 
             Maybe.apply(index_from_dictionary).to_arguments(
                     Just((trait["species"].value, trait["symbol"].value)),
                     Just("XWK"),
-                    Just(share.wiki_cache)
+                    Just(wiki_cache)
                     )
 
             doc.set_data(json.dumps(trait.data))
@@ -356,13 +359,12 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac
              .bind(lambda idterm: write_document(db, idterm, "gene", doc)))
 
 
-def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace ) -> None:
+def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
     """Index phenotypes data into a Xapian index."""
     with locked_xapian_writable_database(
             xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db:
 
-        share: ProcessSharedData = namespace.shared
-        for trait in share.mysql_data:
+        for trait in mysql_data:
             # pylint: disable=cell-var-from-loop
             doc = xapian.Document()
             termgenerator.set_document(doc)
@@ -430,10 +432,9 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace]
                 sparql_uri: str, start: int = 0) -> None:
     """Run SQL query, and index its results for Xapian."""
     i = start
+
     try:
-        with Manager() as manager, worker_queue() as spawn_worker:
-            rdfcache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY)
-            wikicache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY)
+        with worker_queue() as spawn_worker:
             with database_connection(sql_uri) as conn:
                 for chunk in group(query_sql(conn, serialize_sql(
                         # KLUDGE: MariaDB does not allow an offset
@@ -443,9 +444,9 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace]
                                        offset=start*DOCUMENTS_PER_CHUNK)),
                                                    server_side=True),
                                    DOCUMENTS_PER_CHUNK):
-                    namespace = manager.Namespace()
-                    namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache, wiki_cache=wikicache)
-                    spawn_worker(index_function, (xapian_build_directory, i, namespace))
+                    global mysql_data
+                    mysql_data = chunk
+                    spawn_worker(index_function, (xapian_build_directory, i))
                     logging.debug("Spawned worker process on chunk %s", i)
                     i += 1
     # In the event of an operational error, open a new connection and
@@ -530,6 +531,10 @@ def create_xapian_index(xapian_directory: str, sql_uri: str,
     start_time = time.perf_counter()
     with temporary_directory("combined", xapian_directory) as combined_index:
         with temporary_directory("build", xapian_directory) as xapian_build_directory:
+            global rif_cache
+            global wiki_cache
+            rif_cache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY)
+            wiki_cache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY)
             logging.info("Indexing genes")
             index_query(index_genes, genes_query, xapian_build_directory, sql_uri, sparql_uri)
             logging.info("Indexing phenotypes")