diff options
| author | John Nduli | 2024-06-25 14:16:11 +0300 | 
|---|---|---|
| committer | BonfaceKilz | 2024-07-03 14:24:01 +0300 | 
| commit | 621be7a4162b186687a10b8227dcc50038703f0a (patch) | |
| tree | 427bbdc3b107732723a8f8cd3857ddd25bea8b48 | |
| parent | dee508d46c916caa5080da87a264347479576dd2 (diff) | |
| download | genenetwork3-621be7a4162b186687a10b8227dcc50038703f0a.tar.gz | |
fix: remove namespaces since child processes copy the rdf caches
| -rwxr-xr-x | scripts/index-genenetwork | 45 | 
1 files changed, 25 insertions, 20 deletions
| diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 5c22b3b..5bdf44f 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -293,22 +293,25 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive)) add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year))) -# class that contains data that will be shared across multiple processes -@dataclass -class ProcessSharedData: - mysql_data: Iterable - rif_cache: Iterable - wiki_cache: Iterable + + + +# When a child process is forked, it inherits a copy of the memory of +# its parent. We use this to pass data retrieved from SQL from parent +# to child. Specifically, we use this global variable. +# This is copy-on-write so make sure child processes don't modify this data +mysql_data: Iterable +rif_cache: Iterable +wiki_cache: Iterable # We use this lock to ensure that only one process writes its Xapian # index to disk at a time. xapian_lock = Lock() -def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace) -> None: +def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: """Index genes data into a Xapian index.""" with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db: - share: ProcessSharedData = namespace.shared - for trait in share.mysql_data: + for trait in mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -341,13 +344,13 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac Maybe.apply(index_from_dictionary).to_arguments( Just((trait["species"].value, trait["symbol"].value)), Just("XRF"), - Just(share.rif_cache) + Just(rif_cache) ) Maybe.apply(index_from_dictionary).to_arguments( Just((trait["species"].value, trait["symbol"].value)), Just("XWK"), - Just(share.wiki_cache) + Just(wiki_cache) ) doc.set_data(json.dumps(trait.data)) @@ -356,13 +359,12 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac .bind(lambda idterm: write_document(db, idterm, "gene", doc))) -def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int, namespace: managers.Namespace ) -> None: +def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: """Index phenotypes data into a Xapian index.""" with locked_xapian_writable_database( xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db: - share: ProcessSharedData = namespace.shared - for trait in share.mysql_data: + for trait in mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -430,10 +432,9 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace] sparql_uri: str, start: int = 0) -> None: """Run SQL query, and index its results for Xapian.""" i = start + try: - with Manager() as manager, worker_queue() as spawn_worker: - rdfcache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY) - wikicache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY) + with worker_queue() as spawn_worker: with database_connection(sql_uri) as conn: for chunk in group(query_sql(conn, serialize_sql( # KLUDGE: MariaDB does not allow an offset @@ -443,9 +444,9 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace] offset=start*DOCUMENTS_PER_CHUNK)), server_side=True), DOCUMENTS_PER_CHUNK): - namespace = manager.Namespace() - namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache, wiki_cache=wikicache) - spawn_worker(index_function, (xapian_build_directory, i, namespace)) + global mysql_data + mysql_data = chunk + spawn_worker(index_function, (xapian_build_directory, i)) logging.debug("Spawned worker process on chunk %s", i) i += 1 # In the event of an operational error, open a new connection and @@ -530,6 +531,10 @@ def create_xapian_index(xapian_directory: str, sql_uri: str, start_time = time.perf_counter() with temporary_directory("combined", xapian_directory) as combined_index: with temporary_directory("build", xapian_directory) as xapian_build_directory: + global rif_cache + global wiki_cache + rif_cache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY) + wiki_cache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY) logging.info("Indexing genes") index_query(index_genes, genes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Indexing phenotypes") | 
