aboutsummaryrefslogtreecommitdiff
path: root/scripts/index-genenetwork
diff options
context:
space:
mode:
authorJohn Nduli2024-06-15 14:46:14 +0300
committerBonfaceKilz2024-06-18 08:58:16 +0300
commit6237006aee0adf256c293266167745634ce2f2c0 (patch)
treea5659d68a05b2aff3517b0b5bda03bbc324de57f /scripts/index-genenetwork
parent5bcc289583629634b14a4a64ff80ac2ff415589b (diff)
downloadgenenetwork3-6237006aee0adf256c293266167745634ce2f2c0.tar.gz
refactor: drop global variables
Diffstat (limited to 'scripts/index-genenetwork')
-rwxr-xr-xscripts/index-genenetwork43
1 files changed, 23 insertions, 20 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 61780e5..1f9c13b 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -8,6 +8,7 @@ xapian index. This xapian index is later used in providing search
through the web interface.
"""
+from dataclasses import dataclass
from collections import deque, namedtuple
import contextlib
import time
@@ -16,7 +17,7 @@ from functools import partial
import itertools
import json
import logging
-from multiprocessing import Lock, Process
+from multiprocessing import Lock, Manager, Process, managers
import os
import pathlib
import resource
@@ -251,8 +252,8 @@ def index_text(text: str) -> None:
termgenerator.increase_termpos()
-@curry(2)
-def index_rif_comments(species, symbol):
+@curry(3)
+def index_rif_comments(species: str, symbol: str, rdfcache: dict):
key = (species, symbol,)
entry = rdfcache.get(key)
if entry:
@@ -276,19 +277,21 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak
add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive))
add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year)))
-# When a child process is forked, it inherits a copy of the memory of
-# its parent. We use this to pass data retrieved from SQL from parent
-# to child. Specifically, we use this global variable.
-data: Iterable
-rdfcache: Iterable
+# class that contains data that will be shared across multiple processes
+@dataclass
+class ProcessSharedData:
+ mysql_data: Iterable
+ rif_cache: Iterable
+
# We use this lock to ensure that only one process writes its Xapian
# index to disk at a time.
xapian_lock = Lock()
-def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
+def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace) -> None:
"""Index genes data into a Xapian index."""
with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db:
- for trait in data:
+ share: ProcessSharedData = namespace.shared
+ for trait in share.mysql_data:
# pylint: disable=cell-var-from-loop
doc = xapian.Document()
termgenerator.set_document(doc)
@@ -321,7 +324,7 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
Maybe.apply(
index_rif_comments
).to_arguments(
- trait["species"], trait["symbol"]
+ trait["species"], trait["symbol"], Just(share.rif_cache)
)
doc.set_data(json.dumps(trait.data))
@@ -330,11 +333,13 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
.bind(lambda idterm: write_document(db, idterm, "gene", doc)))
-def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
+def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace ) -> None:
"""Index phenotypes data into a Xapian index."""
with locked_xapian_writable_database(
xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db:
- for trait in data:
+
+ share: ProcessSharedData = namespace.shared
+ for trait in share.mysql_data:
# pylint: disable=cell-var-from-loop
doc = xapian.Document()
termgenerator.set_document(doc)
@@ -397,14 +402,13 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator:
process.join()
-def index_query(index_function: Callable, query: SQLQuery,
+def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace], None], query: SQLQuery,
xapian_build_directory: pathlib.Path, sql_uri: str,
sparql_uri: str, start: int = 0) -> None:
"""Run SQL query, and index its results for Xapian."""
i = start
try:
- with worker_queue() as spawn_worker:
- global rdfcache
+ with Manager() as manager, worker_queue() as spawn_worker:
rdfcache = build_rif_cache(sparql_uri)
with database_connection(sql_uri) as conn:
for chunk in group(query_sql(conn, serialize_sql(
@@ -415,10 +419,9 @@ def index_query(index_function: Callable, query: SQLQuery,
offset=start*DOCUMENTS_PER_CHUNK)),
server_side=True),
DOCUMENTS_PER_CHUNK):
- # pylint: disable=global-statement
- global data
- data = chunk
- spawn_worker(index_function, (xapian_build_directory, i))
+ namespace = manager.Namespace()
+ namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache)
+ spawn_worker(index_function, (xapian_build_directory, i, namespace))
logging.debug("Spawned worker process on chunk %s", i)
i += 1
# In the event of an operational error, open a new connection and