From 6237006aee0adf256c293266167745634ce2f2c0 Mon Sep 17 00:00:00 2001 From: John Nduli Date: Sat, 15 Jun 2024 14:46:14 +0300 Subject: refactor: drop global variables --- scripts/index-genenetwork | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 61780e5..1f9c13b 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -8,6 +8,7 @@ xapian index. This xapian index is later used in providing search through the web interface. """ +from dataclasses import dataclass from collections import deque, namedtuple import contextlib import time @@ -16,7 +17,7 @@ from functools import partial import itertools import json import logging -from multiprocessing import Lock, Process +from multiprocessing import Lock, Manager, Process, managers import os import pathlib import resource @@ -251,8 +252,8 @@ def index_text(text: str) -> None: termgenerator.increase_termpos() -@curry(2) -def index_rif_comments(species, symbol): +@curry(3) +def index_rif_comments(species: str, symbol: str, rdfcache: dict): key = (species, symbol,) entry = rdfcache.get(key) if entry: @@ -276,19 +277,21 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive)) add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year))) -# When a child process is forked, it inherits a copy of the memory of -# its parent. We use this to pass data retrieved from SQL from parent -# to child. Specifically, we use this global variable. -data: Iterable -rdfcache: Iterable +# class that contains data that will be shared across multiple processes +@dataclass +class ProcessSharedData: + mysql_data: Iterable + rif_cache: Iterable + # We use this lock to ensure that only one process writes its Xapian # index to disk at a time. xapian_lock = Lock() -def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: +def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace) -> None: """Index genes data into a Xapian index.""" with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db: - for trait in data: + share: ProcessSharedData = namespace.shared + for trait in share.mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -321,7 +324,7 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: Maybe.apply( index_rif_comments ).to_arguments( - trait["species"], trait["symbol"] + trait["species"], trait["symbol"], Just(share.rif_cache) ) doc.set_data(json.dumps(trait.data)) @@ -330,11 +333,13 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: .bind(lambda idterm: write_document(db, idterm, "gene", doc))) -def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: +def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace ) -> None: """Index phenotypes data into a Xapian index.""" with locked_xapian_writable_database( xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db: - for trait in data: + + share: ProcessSharedData = namespace.shared + for trait in share.mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -397,14 +402,13 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator: process.join() -def index_query(index_function: Callable, query: SQLQuery, +def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace], None], query: SQLQuery, xapian_build_directory: pathlib.Path, sql_uri: str, sparql_uri: str, start: int = 0) -> None: """Run SQL query, and index its results for Xapian.""" i = start try: - with worker_queue() as spawn_worker: - global rdfcache + with Manager() as manager, worker_queue() as spawn_worker: rdfcache = build_rif_cache(sparql_uri) with database_connection(sql_uri) as conn: for chunk in group(query_sql(conn, serialize_sql( @@ -415,10 +419,9 @@ def index_query(index_function: Callable, query: SQLQuery, offset=start*DOCUMENTS_PER_CHUNK)), server_side=True), DOCUMENTS_PER_CHUNK): - # pylint: disable=global-statement - global data - data = chunk - spawn_worker(index_function, (xapian_build_directory, i)) + namespace = manager.Namespace() + namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache) + spawn_worker(index_function, (xapian_build_directory, i, namespace)) logging.debug("Spawned worker process on chunk %s", i) i += 1 # In the event of an operational error, open a new connection and -- cgit v1.2.3