aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/index-genenetwork29
1 files changed, 15 insertions, 14 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork
index 5d231ad..7600f90 100755
--- a/scripts/index-genenetwork
+++ b/scripts/index-genenetwork
@@ -21,6 +21,7 @@ import pathlib
import resource
import shutil
import tempfile
+from typing import Callable, Generator, Iterable, List
import MySQLdb
import click
@@ -107,7 +108,7 @@ phenotypes_query = SQLQuery(
"LEFT JOIN Geno ON PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id"])
-def serialize_sql(query):
+def serialize_sql(query: SQLQuery) -> str:
"""Serialize SQLQuery object to a string."""
sql = f"SELECT {', '.join(query.fields)} FROM {' '.join(query.tables)}"
def append_to_sql(appendee):
@@ -122,7 +123,7 @@ def serialize_sql(query):
@contextlib.contextmanager
-def locked_xapian_writable_database(path):
+def locked_xapian_writable_database(path: pathlib.Path) -> xapian.WritableDatabase:
"""Open xapian database for writing.
When a process is writing to a xapian database opened by this
@@ -148,7 +149,7 @@ def locked_xapian_writable_database(path):
# pylint: disable=invalid-name
-def write_document(db, identifier, doctype, doc):
+def write_document(db: xapian.WritableDatabase, identifier: str, doctype: str, doc: xapian.Document) -> None:
"""Write document into xapian database."""
# We use the XT and Q prefixes to indicate the type and idterm
# respectively.
@@ -160,7 +161,7 @@ def write_document(db, identifier, doctype, doc):
termgenerator = xapian.TermGenerator()
termgenerator.set_stemmer(xapian.Stem("en"))
-def index_text(text):
+def index_text(text: str) -> None:
"""Index text and increase term position."""
termgenerator.index_text(text)
termgenerator.increase_termpos()
@@ -187,12 +188,12 @@ add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(ye
# When a child process is forked, it inherits a copy of the memory of
# its parent. We use this to pass data retrieved from SQL from parent
# to child. Specifically, we use this global variable.
-data = None
+data: Iterable
# We use this lock to ensure that only one process writes its Xapian
# index to disk at a time.
xapian_lock = Lock()
-def index_genes(xapian_build_directory, chunk_index):
+def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
"""Index genes data into a Xapian index."""
with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db:
for trait in data:
@@ -231,7 +232,7 @@ def index_genes(xapian_build_directory, chunk_index):
.bind(lambda idterm: write_document(db, idterm, "gene", doc)))
-def index_phenotypes(xapian_build_directory, chunk_index):
+def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None:
"""Index phenotypes data into a Xapian index."""
with locked_xapian_writable_database(
xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db:
@@ -276,15 +277,15 @@ def index_phenotypes(xapian_build_directory, chunk_index):
.bind(lambda idterm: write_document(db, idterm, "phenotype", doc)))
-def group(generator, chunk_size):
+def group(generator: Iterable, chunk_size: int) -> Iterable:
"""Group elements of generator into chunks."""
return iter(lambda: tuple(itertools.islice(generator, chunk_size)), ())
@contextlib.contextmanager
-def worker_queue(number_of_workers=os.cpu_count()):
+def worker_queue(number_of_workers: int = os.cpu_count()) -> Generator:
"""Manage a pool of worker processes returning a function to spawn them."""
- processes = deque()
+ processes: deque = deque()
def spawn(target, args):
if len(processes) == number_of_workers:
@@ -298,7 +299,7 @@ def worker_queue(number_of_workers=os.cpu_count()):
process.join()
-def index_query(index_function, query, xapian_build_directory, start=0):
+def index_query(index_function: Callable, query: SQLQuery, xapian_build_directory: pathlib.Path, start: int = 0) -> None:
"""Run SQL query, and index its results for Xapian."""
i = start
try:
@@ -328,13 +329,13 @@ def index_query(index_function, query, xapian_build_directory, start=0):
@contextlib.contextmanager
-def temporary_directory(prefix, parent_directory):
+def temporary_directory(prefix: str, parent_directory: str) -> Generator:
"""Create temporary directory returning it as a PosixPath."""
with tempfile.TemporaryDirectory(prefix=prefix, dir=parent_directory) as tmpdirname:
yield pathlib.Path(tmpdirname)
-def xapian_compact(combined_index, indices):
+def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None:
"""Compact and combine several Xapian indices."""
# xapian-compact opens all indices simultaneously. So, raise the limit on
# the number of open files.
@@ -352,7 +353,7 @@ def xapian_compact(combined_index, indices):
@click.command(help="Index GeneNetwork data and build Xapian search index in XAPIAN_DIRECTORY.")
@click.argument("xapian_directory")
# pylint: disable=missing-function-docstring
-def main(xapian_directory):
+def main(xapian_directory: str) -> None:
logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG"),
format='%(relativeCreated)s: %(levelname)s: %(message)s')
pathlib.Path(xapian_directory).mkdir(exist_ok=True)