aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorJohn Nduli2024-06-18 17:10:22 +0300
committerBonfaceKilz2024-07-03 14:24:01 +0300
commitbf580fae39fd6742f4874d7eea68feaa939146e7 (patch)
treee328d79e9e5d1d27b997dd0808f5d17b9ac13776 /scripts
parent6cbe69014beeb5f7ce38e63032c61225e269bfd4 (diff)
downloadgenenetwork3-bf580fae39fd6742f4874d7eea68feaa939146e7.tar.gz
feat: add wikidata indexing
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/index-genenetwork128
1 files changed, 54 insertions, 74 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index a554b47..8efe955 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -24,7 +24,7 @@ import resource
import shutil
import sys
import tempfile
-from typing import Callable, Generator, Iterable, List
+from typing import Callable, Generator, Hashable, Iterable, List
from SPARQLWrapper import SPARQLWrapper, JSON
import MySQLdb
@@ -125,6 +125,38 @@ phenotypes_query = SQLQuery(
SQLTableClause("LEFT JOIN", "Geno",
"PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id")])
+RIF_CACHE_QUERY = """
+PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+PREFIX gnt: <http://genenetwork.org/term/>
+PREFIX gnc: <http://genenetwork.org/category/>
+
+SELECT ?symbolName ?speciesName GROUP_CONCAT(DISTINCT ?comment ; separator=\"\\n\") AS ?comment WHERE {
+ ?symbol rdfs:comment _:node ;
+ rdfs:label ?symbolName .
+_:node rdf:type gnc:GNWikiEntry ;
+ gnt:belongsToSpecies ?species ;
+ rdfs:comment ?comment .
+?species gnt:shortName ?speciesName .
+} GROUP BY ?speciesName ?symbolName
+"""
+
+WIKI_CACHE_QUERY = """
+PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+PREFIX gnt: <http://genenetwork.org/term/>
+PREFIX gnc: <http://genenetwork.org/category/>
+
+SELECT * WHERE {
+ ?symbol rdfs:comment _:node ;
+ rdfs:label ?symbolName .
+_:node rdf:type gnc:NCBIWikiEntry ;
+ gnt:belongsToSpecies ?species ;
+ rdfs:comment ?comment .
+?species gnt:shortName ?speciesName .
+}
+"""
+
def serialize_sql(query: SQLQuery) -> str:
"""Serialize SQLQuery object to a string."""
@@ -171,55 +203,10 @@ def locked_xapian_writable_database(path: pathlib.Path) -> xapian.WritableDataba
db.close()
-def build_rif_cache(sparql_uri: str):
- cache = {}
- sparql = SPARQLWrapper(sparql_uri)
- sparql.setReturnFormat(JSON)
- query = """
-PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
-PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
-PREFIX gnt: <http://genenetwork.org/term/>
-PREFIX gnc: <http://genenetwork.org/category/>
-
-SELECT ?symbolName ?speciesName GROUP_CONCAT(DISTINCT ?comment ; separator=\"\\n\") AS ?comment WHERE {
- ?symbol rdfs:comment _:node ;
- rdfs:label ?symbolName .
-_:node rdf:type gnc:GNWikiEntry ;
- gnt:belongsToSpecies ?species ;
- rdfs:comment ?comment .
-?species gnt:shortName ?speciesName .
-} GROUP BY ?speciesName ?symbolName
-"""
- sparql.setQuery(query)
- results = sparql.queryAndConvert()
- if not isinstance(results, dict):
- raise TypeError(f"Expected results to be a dict but found {type(results)}")
- bindings = results["results"]["bindings"]
- for entry in bindings :
- x = (entry["speciesName"]["value"], entry["symbolName"]["value"],)
- cache[x] = entry["comment"]["value"]
- return cache
-
-
-def build_wiki_cache(sparql_uri: str):
+def build_rdf_cache(sparql_uri: str, query: str):
cache = {}
sparql = SPARQLWrapper(sparql_uri)
sparql.setReturnFormat(JSON)
- query = """
-PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
-PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
-PREFIX gnt: <http://genenetwork.org/term/>
-PREFIX gnc: <http://genenetwork.org/category/>
-
-SELECT * WHERE {
- ?symbol rdfs:comment _:node ;
- rdfs:label ?symbolName .
-_:node rdf:type gnc:NCBIWikiEntry ;
- gnt:belongsToSpecies ?species ;
- rdfs:comment ?comment .
-?species gnt:shortName ?speciesName .
-}
-"""
sparql.setQuery(query)
results = sparql.queryAndConvert()
if not isinstance(results, dict):
@@ -281,21 +268,13 @@ def index_text(text: str) -> None:
termgenerator.index_text(text)
termgenerator.increase_termpos()
-
@curry(3)
-def index_rif_comments(species: str, symbol: str, rdfcache: dict):
- key = (species, symbol,)
- entry = rdfcache.get(key)
- if entry:
- termgenerator.index_text(entry, 0, "XRF")
-
+def index_from_dictionary(keys: Hashable, prefix: str, dictionary: dict):
+ entry = dictionary.get(keys)
+ if not entry:
+ return
+ termgenerator.index_text(entry, 0, prefix)
-@curry(2)
-def index_wiki_comments(species, symbol):
- key = (species, symbol,)
- entry = wikicache.get(key)
- if entry:
- termgenerator.index_text(entry, 0, "XRF")
index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text)
index_authors = lambda authors: termgenerator.index_text(authors, 0, "A")
@@ -319,6 +298,7 @@ add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(ye
class ProcessSharedData:
mysql_data: Iterable
rif_cache: Iterable
+ wiki_cache: Iterable
# We use this lock to ensure that only one process writes its Xapian
# index to disk at a time.
@@ -358,17 +338,17 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int, namespac
trait["chr"].bind(index_chr)
trait["geno_chr"].bind(index_peakchr)
- Maybe.apply(
- index_rif_comments
- ).to_arguments(
- trait["species"], trait["symbol"], Just(share.rif_cache)
- )
+ Maybe.apply(index_from_dictionary).to_arguments(
+ Just((trait["species"].value, trait["symbol"].value)),
+ Just("XRF"),
+ Just(share.rif_cache)
+ )
- Maybe.apply(
- index_wiki_comments
- ).to_arguments(
- trait["species"], trait["symbol"]
- )
+ Maybe.apply(index_from_dictionary).to_arguments(
+ Just((trait["species"].value, trait["symbol"].value)),
+ Just("XRWD"),
+ Just(share.wiki_cache)
+ )
doc.set_data(json.dumps(trait.data))
(Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}"))
@@ -452,8 +432,8 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace]
i = start
try:
with Manager() as manager, worker_queue() as spawn_worker:
- rdfcache = build_rif_cache(sparql_uri)
- wikicache = build_wiki_cache(sparql_uri)
+ rdfcache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY)
+ wikicache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY)
with database_connection(sql_uri) as conn:
for chunk in group(query_sql(conn, serialize_sql(
# KLUDGE: MariaDB does not allow an offset
@@ -464,7 +444,7 @@ def index_query(index_function: Callable[[pathlib.Path, int, managers.Namespace]
server_side=True),
DOCUMENTS_PER_CHUNK):
namespace = manager.Namespace()
- namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache)
+ namespace.shared = ProcessSharedData(mysql_data=chunk, rif_cache=rdfcache, wiki_cache=wikicache)
spawn_worker(index_function, (xapian_build_directory, i, namespace))
logging.debug("Spawned worker process on chunk %s", i)
i += 1
@@ -508,7 +488,7 @@ def is_data_modified(xapian_directory: str,
sparql_uri: str) -> None:
dir_ = pathlib.Path(xapian_directory)
with locked_xapian_writable_database(dir_) as db, database_connection(sql_uri) as conn:
- checksums = -1
+ checksums = ""
if db.get_metadata('tables'):
checksums = " ".join([
str(result["Checksum"].value)