aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/index-genenetwork45
1 files changed, 25 insertions, 20 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 5c22b3b..5bdf44f 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -293,22 +293,25 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak
add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive))
add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year)))
-# class that contains data that will be shared across multiple processes
-@dataclass
-class ProcessSharedData:
- mysql_data: Iterable
- rif_cache: Iterable
- wiki_cache: Iterable
+
+
+
+# When a child process is forked, it inherits a copy of the memory of
+# its parent. We use this to pass data retrieved from SQL from parent
+# to child. Specifically, we use this global variable.
+# This is copy-on-write so make sure child processes don't modify this data
+mysql_data: Iterable
+rif_cache: Iterable
+wiki_cache: Iterable
# We use this lock to ensure that only one process writes its Xapian
# index to disk at a time.
xapian_lock = Lock()
-def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace) -> None:
+def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
"""Index genes data into a Xapian index."""
with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db:
- share: ProcessSharedData = namespace.shared
- for trait in share.mysql_data:
+ for trait in mysql_data:
# pylint: disable=cell-var-from-loop
doc = xapian.Document()
termgenerator.set_document(doc)
@@ -341,13 +344,13 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac
Maybe.apply(index_from_dictionary).to_arguments(
Just((trait["species"].value, trait["symbol"].value)),
Just("XRF"),
- Just(share.rif_cache)
+ Just(rif_cache)
)
Maybe.apply(index_from_dictionary).to_arguments(
Just((trait["species"].value, trait["symbol"].value)),
Just("XWK"),
- Just(share.wiki_cache)
+ Just(wiki_cache)
)
doc.set_data(json.dumps(trait.data))
@@ -356,13 +359,12 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac
.bind(lambda idterm: write_document(db, idterm, "gene", doc)))
-def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace ) -> None:
+def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
"""Index phenotypes data into a Xapian index."""
with locked_xapian_writable_database(
xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db:
- share: ProcessSharedData = namespace.shared
- for trait in share.mysql_data:
+ for trait in mysql_data:
# pylint: disable=cell-var-from-loop
doc = xapian.Document()
termgenerator.set_document(doc)
@@ -430,10 +432,9 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace]
sparql_uri: str, start: int = 0) -> None:
"""Run SQL query, and index its results for Xapian."""
i = start
+
try:
- with Manager() as manager, worker_queue() as spawn_worker:
- rdfcache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY)
- wikicache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY)
+ with worker_queue() as spawn_worker:
with database_connection(sql_uri) as conn:
for chunk in group(query_sql(conn, serialize_sql(
# KLUDGE: MariaDB does not allow an offset
@@ -443,9 +444,9 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace]
offset=start*DOCUMENTS_PER_CHUNK)),
server_side=True),
DOCUMENTS_PER_CHUNK):
- namespace = manager.Namespace()
- namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache, wiki_cache=wikicache)
- spawn_worker(index_function, (xapian_build_directory, i, namespace))
+ global mysql_data
+ mysql_data = chunk
+ spawn_worker(index_function, (xapian_build_directory, i))
logging.debug("Spawned worker process on chunk %s", i)
i += 1
# In the event of an operational error, open a new connection and
@@ -530,6 +531,10 @@ def create_xapian_index(xapian_directory: str, sql_uri: str,
start_time = time.perf_counter()
with temporary_directory("combined", xapian_directory) as combined_index:
with temporary_directory("build", xapian_directory) as xapian_build_directory:
+ global rif_cache
+ global wiki_cache
+ rif_cache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY)
+ wiki_cache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY)
logging.info("Indexing genes")
index_query(index_genes, genes_query, xapian_build_directory, sql_uri, sparql_uri)
logging.info("Indexing phenotypes")