diff options
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/index-genenetwork | 251 | ||||
-rw-r--r-- | scripts/rqtl_wrapper.R | 16 | ||||
-rwxr-xr-x | scripts/update_rif_table.py | 167 |
3 files changed, 395 insertions, 39 deletions
diff --git a/scripts/index-genenetwork b/scripts/index-genenetwork index 1f649cf..2779abc 100755 --- a/scripts/index-genenetwork +++ b/scripts/index-genenetwork @@ -8,21 +8,26 @@ xapian index. This xapian index is later used in providing search through the web interface. """ - -from collections import deque, namedtuple +from dataclasses import dataclass +from collections import deque, namedtuple, Counter import contextlib +import time +import datetime from functools import partial import itertools import json import logging -from multiprocessing import Lock, Process +from multiprocessing import Lock, Manager, Process, managers import os import pathlib import resource +import re import shutil import sys +import hashlib import tempfile -from typing import Callable, Generator, Iterable, List +from typing import Callable, Dict, Generator, Hashable, Iterable, List +from SPARQLWrapper import SPARQLWrapper, JSON import MySQLdb import click @@ -33,7 +38,10 @@ import xapian from gn3.db_utils import database_connection from gn3.monads import query_sql -DOCUMENTS_PER_CHUNK = 100000 +DOCUMENTS_PER_CHUNK = 100_000 +# Running the script in prod consumers ~1GB per process when handling 100_000 Documents per chunk. +# To prevent running out of RAM, we set this as the upper bound for total concurrent processes +PROCESS_COUNT_LIMIT = 67 SQLQuery = namedtuple("SQLQuery", ["fields", "tables", "where", "offset", "limit"], @@ -122,6 +130,38 @@ phenotypes_query = SQLQuery( SQLTableClause("LEFT JOIN", "Geno", "PublishXRef.Locus = Geno.Name AND Geno.SpeciesId = Species.Id")]) +WIKI_CACHE_QUERY = """ +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX gnt: <http://genenetwork.org/term/> +PREFIX gnc: <http://genenetwork.org/category/> + +SELECT ?symbolName ?speciesName GROUP_CONCAT(DISTINCT ?comment ; separator=\"\\n\") AS ?comment WHERE { + ?symbol rdfs:comment _:node ; + rdfs:label ?symbolName . +_:node rdf:type gnc:GNWikiEntry ; + gnt:belongsToSpecies ?species ; + rdfs:comment ?comment . +?species gnt:shortName ?speciesName . +} GROUP BY ?speciesName ?symbolName +""" + +RIF_CACHE_QUERY = """ +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX gnt: <http://genenetwork.org/term/> +PREFIX gnc: <http://genenetwork.org/category/> + +SELECT ?symbolName ?speciesName GROUP_CONCAT(DISTINCT ?comment ; separator=\"\\n\") AS ?comment WHERE { + ?symbol rdfs:comment _:node ; + rdfs:label ?symbolName . +_:node rdf:type gnc:NCBIWikiEntry ; + gnt:belongsToSpecies ?species ; + rdfs:comment ?comment . +?species gnt:shortName ?speciesName . +} GROUP BY ?speciesName ?symbolName +""" + def serialize_sql(query: SQLQuery) -> str: """Serialize SQLQuery object to a string.""" @@ -168,6 +208,48 @@ def locked_xapian_writable_database(path: pathlib.Path) -> xapian.WritableDataba db.close() +def build_rdf_cache(sparql_uri: str, query: str, remove_common_words: bool = False): + cache = {} + sparql = SPARQLWrapper(sparql_uri) + sparql.setReturnFormat(JSON) + sparql.setQuery(query) + results = sparql.queryAndConvert() + if not isinstance(results, dict): + raise TypeError(f"Expected results to be a dict but found {type(results)}") + bindings = results["results"]["bindings"] + count: Counter[str] = Counter() + words_regex = re.compile(r"\w+") + for entry in bindings : + x = (entry["speciesName"]["value"], entry["symbolName"]["value"],) + value = entry["comment"]["value"] + value = " ".join(words_regex.findall(value)) # remove punctuation + cache[x] = value + count.update(Counter(value.lower().strip().split())) + + if not remove_common_words: + return cache + + words_to_drop = set() + for word, cnt in count.most_common(1000): + if len(word) < 4 or cnt > 3000: + words_to_drop.add(word) + smaller_cache = {} + for entry, value in cache.items(): + new_value = set(word for word in value.lower().split() if word not in words_to_drop) + smaller_cache[entry] = " ".join(new_value) + return smaller_cache + + +def md5hash_ttl_dir(ttl_dir: pathlib.Path) -> str: + if not ttl_dir.exists(): + return "-1" + ttl_hash = hashlib.new("md5") + for ttl_file in ttl_dir.glob("*.ttl"): + with open(ttl_file, encoding="utf-8") as f_: + ttl_hash.update(f_.read().encode()) + return ttl_hash.hexdigest() + + # pylint: disable=invalid-name def write_document(db: xapian.WritableDatabase, identifier: str, doctype: str, doc: xapian.Document) -> None: @@ -181,15 +263,23 @@ def write_document(db: xapian.WritableDatabase, identifier: str, termgenerator = xapian.TermGenerator() termgenerator.set_stemmer(xapian.Stem("en")) +termgenerator.set_stopper_strategy(xapian.TermGenerator.STOP_ALL) +termgenerator.set_stopper(xapian.SimpleStopper()) def index_text(text: str) -> None: """Index text and increase term position.""" termgenerator.index_text(text) termgenerator.increase_termpos() -# pylint: disable=unnecessary-lambda -index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text) +@curry(3) +def index_from_dictionary(keys: Hashable, prefix: str, dictionary: dict): + entry = dictionary.get(keys) + if not entry: + return + termgenerator.index_text_without_positions(entry, 0, prefix) + +index_text_without_positions = lambda text: termgenerator.index_text_without_positions(text) index_authors = lambda authors: termgenerator.index_text(authors, 0, "A") index_species = lambda species: termgenerator.index_text_without_positions(species, 0, "XS") index_group = lambda group: termgenerator.index_text_without_positions(group, 0, "XG") @@ -206,10 +296,17 @@ add_peakmb = lambda doc, peakmb: doc.add_value(3, xapian.sortable_serialise(peak add_additive = lambda doc, additive: doc.add_value(4, xapian.sortable_serialise(additive)) add_year = lambda doc, year: doc.add_value(5, xapian.sortable_serialise(float(year))) + + + # When a child process is forked, it inherits a copy of the memory of # its parent. We use this to pass data retrieved from SQL from parent # to child. Specifically, we use this global variable. -data: Iterable +# This is copy-on-write so make sure child processes don't modify this data +mysql_data: Iterable +rif_cache: Iterable +wiki_cache: Iterable + # We use this lock to ensure that only one process writes its Xapian # index to disk at a time. xapian_lock = Lock() @@ -217,7 +314,7 @@ xapian_lock = Lock() def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: """Index genes data into a Xapian index.""" with locked_xapian_writable_database(xapian_build_directory / f"genes-{chunk_index:04d}") as db: - for trait in data: + for trait in mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -230,7 +327,7 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: trait["additive"].bind(partial(add_additive, doc)) # Index free text. - for key in ["description", "tissue", "dataset_fullname"]: + for key in ["description", "tissue", "dataset"]: trait[key].bind(index_text) trait.pop("probe_target_description").bind(index_text) for key in ["name", "symbol", "species", "group"]: @@ -242,11 +339,23 @@ def index_genes(xapian_build_directory: pathlib.Path, chunk_index: int) -> None: trait["species"].bind(index_species) trait["group"].bind(index_group) trait["tissue"].bind(index_tissue) - trait["dataset_fullname"].bind(index_dataset) + trait["dataset"].bind(index_dataset) trait["symbol"].bind(index_symbol) trait["chr"].bind(index_chr) trait["geno_chr"].bind(index_peakchr) + Maybe.apply(index_from_dictionary).to_arguments( + Just((trait["species"].value, trait["symbol"].value)), + Just("XRF"), + Just(rif_cache) + ) + + Maybe.apply(index_from_dictionary).to_arguments( + Just((trait["species"].value, trait["symbol"].value)), + Just("XWK"), + Just(wiki_cache) + ) + doc.set_data(json.dumps(trait.data)) (Maybe.apply(curry(2, lambda name, dataset: f"{name}:{dataset}")) .to_arguments(trait["name"], trait["dataset"]) @@ -257,7 +366,8 @@ def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> """Index phenotypes data into a Xapian index.""" with locked_xapian_writable_database( xapian_build_directory / f"phenotypes-{chunk_index:04d}") as db: - for trait in data: + + for trait in mysql_data: # pylint: disable=cell-var-from-loop doc = xapian.Document() termgenerator.set_document(doc) @@ -270,7 +380,7 @@ def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> trait["year"].bind(partial(add_year, doc)) # Index free text. - for key in ["description", "authors", "dataset_fullname"]: + for key in ["description", "authors", "dataset"]: trait[key].bind(index_text) for key in ["Abstract", "Title"]: trait.pop(key).bind(index_text) @@ -284,7 +394,7 @@ def index_phenotypes(xapian_build_directory: pathlib.Path, chunk_index: int) -> trait["group"].bind(index_group) trait["authors"].bind(index_authors) trait["geno_chr"].bind(index_peakchr) - trait["dataset_fullname"].bind(index_dataset) + trait["dataset"].bind(index_dataset) # Convert name from integer to string. trait["name"] = trait["name"].map(str) @@ -320,12 +430,16 @@ def worker_queue(number_of_workers: int = os.cpu_count() or 1) -> Generator: process.join() -def index_query(index_function: Callable, query: SQLQuery, - xapian_build_directory: pathlib.Path, sql_uri: str, start: int = 0) -> None: +def index_query(index_function: Callable[[pathlib.Path, int], None], query: SQLQuery, + xapian_build_directory: pathlib.Path, sql_uri: str, + sparql_uri: str, start: int = 0) -> None: """Run SQL query, and index its results for Xapian.""" i = start + default_no_of_workers = os.cpu_count() or 1 + no_of_workers = min(default_no_of_workers, PROCESS_COUNT_LIMIT) + try: - with worker_queue() as spawn_worker: + with worker_queue(no_of_workers) as spawn_worker: with database_connection(sql_uri) as conn: for chunk in group(query_sql(conn, serialize_sql( # KLUDGE: MariaDB does not allow an offset @@ -335,9 +449,8 @@ def index_query(index_function: Callable, query: SQLQuery, offset=start*DOCUMENTS_PER_CHUNK)), server_side=True), DOCUMENTS_PER_CHUNK): - # pylint: disable=global-statement - global data - data = chunk + global mysql_data + mysql_data = chunk spawn_worker(index_function, (xapian_build_directory, i)) logging.debug("Spawned worker process on chunk %s", i) i += 1 @@ -347,7 +460,7 @@ def index_query(index_function: Callable, query: SQLQuery, except MySQLdb._exceptions.OperationalError: logging.warning("Reopening connection to recovering from SQL operational error", exc_info=True) - index_query(index_function, query, xapian_build_directory, sql_uri, i) + index_query(index_function, query, xapian_build_directory, sql_uri, sparql_uri, i) @contextlib.contextmanager @@ -357,12 +470,33 @@ def temporary_directory(prefix: str, parent_directory: str) -> Generator: yield pathlib.Path(tmpdirname) +def parallel_xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: + # We found that compacting 50 files of ~600MB has decent performance + no_of_workers = 20 + file_groupings = 50 + with temporary_directory("parallel_combine", str(combined_index)) as parallel_combine: + parallel_combine.mkdir(parents=True, exist_ok=True) + with worker_queue(no_of_workers) as spawn_worker: + i = 0 + while i < len(indices): + end_index = (i + file_groupings) + files = indices[i:end_index] + last_item_idx = i + len(files) + spawn_worker(xapian_compact, (parallel_combine / f"{i}_{last_item_idx}", files)) + logging.debug("Spawned worker to compact files from %s to %s", i, last_item_idx) + i = end_index + logging.debug("Completed parallel xapian compacts") + xapian_compact(combined_index, list(parallel_combine.iterdir())) + + def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> None: """Compact and combine several Xapian indices.""" # xapian-compact opens all indices simultaneously. So, raise the limit on # the number of open files. soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (max(soft, min(10*len(indices), hard)), hard)) + combined_index.mkdir(parents=True, exist_ok=True) + start = time.monotonic() db = xapian.Database() try: for index in indices: @@ -370,32 +504,73 @@ def xapian_compact(combined_index: pathlib.Path, indices: List[pathlib.Path]) -> db.compact(str(combined_index), xapian.DBCOMPACT_MULTIPASS | xapian.Compactor.FULLER) finally: db.close() + logging.debug("Removing databases that were compacted into %s", combined_index.name) + for folder in indices: + shutil.rmtree(folder) + logging.debug("Completed xapian-compact %s; handled %s files in %s minutes", combined_index.name, len(indices), (time.monotonic() - start) / 60) + + +@click.command(help="Verify checksums and return True when the data has been changed.") +@click.argument("xapian_directory") +@click.argument("sql_uri") +@click.argument("sparql_uri") +def is_data_modified(xapian_directory: str, + sql_uri: str, + sparql_uri: str) -> None: + dir_ = pathlib.Path(xapian_directory) + with locked_xapian_writable_database(dir_) as db, database_connection(sql_uri) as conn: + checksums = "-1" + if db.get_metadata('tables'): + checksums = " ".join([ + str(result["Checksum"].value) + for result in query_sql( + conn, + f"CHECKSUM TABLE {', '.join(db.get_metadata('tables').decode().split())}") + ]) + # Return a zero exit status code when the data has changed; + # otherwise exit with a 1 exit status code. + generif = pathlib.Path("/var/lib/data/") + if (db.get_metadata("generif-checksum").decode() == md5hash_ttl_dir(generif) and + db.get_metadata("checksums").decode() == checksums): + sys.exit(1) + sys.exit(0) @click.command(help="Index GeneNetwork data and build Xapian search index in XAPIAN_DIRECTORY.") @click.argument("xapian_directory") @click.argument("sql_uri") +@click.argument("sparql_uri") # pylint: disable=missing-function-docstring -def main(xapian_directory: str, sql_uri: str) -> None: +def create_xapian_index(xapian_directory: str, sql_uri: str, + sparql_uri: str) -> None: logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG"), - format='%(relativeCreated)s: %(levelname)s: %(message)s') + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S %Z') + if not pathlib.Path(xapian_directory).exists(): + pathlib.Path(xapian_directory).mkdir() # Ensure no other build process is running. - if pathlib.Path(xapian_directory).exists(): - logging.error("Build directory %s already exists; " + if any(pathlib.Path(xapian_directory).iterdir()): + logging.error("Build directory %s has build files; " "perhaps another build process is running.", xapian_directory) sys.exit(1) - pathlib.Path(xapian_directory).mkdir() + start_time = time.perf_counter() with temporary_directory("combined", xapian_directory) as combined_index: with temporary_directory("build", xapian_directory) as xapian_build_directory: + global rif_cache + global wiki_cache + logging.info("Building wiki cache") + wiki_cache = build_rdf_cache(sparql_uri, WIKI_CACHE_QUERY, remove_common_words=True) + logging.info("Building rif cache") + rif_cache = build_rdf_cache(sparql_uri, RIF_CACHE_QUERY, remove_common_words=True) logging.info("Indexing genes") - index_query(index_genes, genes_query, xapian_build_directory, sql_uri) + index_query(index_genes, genes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Indexing phenotypes") - index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri) + index_query(index_phenotypes, phenotypes_query, xapian_build_directory, sql_uri, sparql_uri) logging.info("Combining and compacting indices") - xapian_compact(combined_index, list(xapian_build_directory.iterdir())) + parallel_xapian_compact(combined_index, list(xapian_build_directory.iterdir())) logging.info("Writing table checksums into index") with locked_xapian_writable_database(combined_index) as db: # Build a (deduplicated) set of all tables referenced in @@ -409,11 +584,27 @@ def main(xapian_directory: str, sql_uri: str) -> None: ] db.set_metadata("tables", " ".join(tables)) db.set_metadata("checksums", " ".join(checksums)) + logging.info("Writing generif checksums into index") + db.set_metadata( + "generif-checksum", + md5hash_ttl_dir(pathlib.Path("/var/lib/data/")).encode()) for child in combined_index.iterdir(): shutil.move(child, xapian_directory) logging.info("Index built") + end_time = time.perf_counter() + index_time = datetime.timedelta(seconds=end_time - start_time) + logging.info(f"Time to Index: {index_time}") + + +@click.group() +def cli(): + pass + + +cli.add_command(is_data_modified) +cli.add_command(create_xapian_index) if __name__ == "__main__": # pylint: disable=no-value-for-parameter - main() + cli() diff --git a/scripts/rqtl_wrapper.R b/scripts/rqtl_wrapper.R index ea2c345..2ac8faa 100644 --- a/scripts/rqtl_wrapper.R +++ b/scripts/rqtl_wrapper.R @@ -3,8 +3,6 @@ library(qtl) library(stringi) library(stringr) -tmp_dir = Sys.getenv("TMPDIR") - option_list = list( make_option(c("-g", "--geno"), type="character", help=".geno file containing a dataset's genotypes"), make_option(c("-p", "--pheno"), type="character", help="File containing two columns - sample names and values"), @@ -18,7 +16,7 @@ option_list = list( make_option(c("--pstrata"), action="store_true", default=NULL, help="Use permutation strata (stored as final column/vector in phenotype input file)"), make_option(c("-s", "--scale"), type="character", default="mb", help="Mapping scale - Megabases (Mb) or Centimorgans (cM)"), make_option(c("--control"), type="character", default=NULL, help="Name of marker (contained in genotype file) to be used as a control"), - make_option(c("-o", "--outdir"), type="character", default=file.path(tmp_dir, "output"), help="Directory in which to write result file"), + make_option(c("-o", "--outdir"), type="character", default=NULL, help="Directory in which to write result file"), make_option(c("-f", "--filename"), type="character", default=NULL, help="Name to use for result file"), make_option(c("-v", "--verbose"), action="store_true", default=NULL, help="Show extra information") ); @@ -58,7 +56,7 @@ geno_file = opt$geno pheno_file = opt$pheno # Generate randomized filename for cross object -cross_file = file.path(tmp_dir, "cross", paste(stri_rand_strings(1, 8), ".cross", sep = "")) +cross_file = file.path(opt$outdir, "cross", paste(stri_rand_strings(1, 8), ".cross", sep = "")) trim <- function( x ) { gsub("(^[[:space:]]+|[[:space:]]+$)", "", x) } @@ -258,9 +256,9 @@ if (!is.null(opt$pairscan)) { # Calculate permutations if (opt$nperm > 0) { if (!is.null(opt$filename)){ - perm_out_file = file.path(opt$outdir, paste("PERM_", opt$filename, sep = "" )) + perm_out_file = file.path(opt$outdir, "output", paste("PERM_", opt$filename, sep = "" )) } else { - perm_out_file = file.path(opt$outdir, paste(pheno_name, "_PERM_", stri_rand_strings(1, 8), sep = "")) + perm_out_file = file.path(opt$outdir, "output", paste(pheno_name, "_PERM_", stri_rand_strings(1, 8), sep = "")) } if (!is.null(opt$addcovar) || !is.null(opt$control)){ @@ -284,9 +282,9 @@ if (opt$nperm > 0) { } if (!is.null(opt$filename)){ - out_file = file.path(opt$outdir, opt$filename) + out_file = file.path(opt$outdir, "output", opt$filename) } else { - out_file = file.path(opt$outdir, paste(pheno_name, "_", stri_rand_strings(1, 8), sep = "")) + out_file = file.path(opt$outdir, "output", paste(pheno_name, "_", stri_rand_strings(1, 8), sep = "")) } if (!is.null(opt$addcovar) || !is.null(opt$control)){ @@ -299,7 +297,7 @@ if (!is.null(opt$addcovar) || !is.null(opt$control)){ verbose_print('Writing results to CSV file\n') if (!is.null(opt$pairscan)) { - map_out_file = file.path(opt$outdir, paste("MAP_", opt$filename, sep = "" )) + map_out_file = file.path(opt$outdir, "output", paste("MAP_", opt$filename, sep = "" )) write.csv(qtl_results[1], out_file) write.csv(qtl_results[2], map_out_file) } else { diff --git a/scripts/update_rif_table.py b/scripts/update_rif_table.py new file mode 100755 index 0000000..24edf3d --- /dev/null +++ b/scripts/update_rif_table.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 + +""" +Script responsible for updating the GeneRIF_BASIC table +""" + +import argparse +import csv +import datetime +import gzip +import logging +import pathlib +import os +from tempfile import TemporaryDirectory +from typing import Dict, Generator + +import requests +from MySQLdb.cursors import DictCursor + +from gn3.db_utils import database_connection + +TAX_IDS = {"10090": 1, "9606": 4, "10116": 2, "3702": 3} + +GENE_INFO_URL = "https://ftp.ncbi.nlm.nih.gov/gene/DATA/gene_info.gz" +GENERIFS_BASIC_URL = "https://ftp.ncbi.nih.gov/gene/GeneRIF/generifs_basic.gz" + +VERSION_ID = 5 + + +INSERT_QUERY = """ INSERT INTO GeneRIF_BASIC +(SpeciesId, GeneId, symbol, PubMed_Id, createtime, comment, TaxID, VersionId) +VALUES (%s, %s, %s, %s, %s, %s, %s, %s) +""" + + +def download_file(url: str, dest: pathlib.Path): + """Saves the contents of url in dest""" + with requests.get(url, stream=True) as resp: + resp.raise_for_status() + with open(dest, "wb") as downloaded_file: + for chunk in resp.iter_content(chunk_size=8192): + downloaded_file.write(chunk) + + +def read_tsv_file(fname: pathlib.Path) -> Generator: + """Load tsv file from NCBI""" + with gzip.open(fname, mode="rt") as gz_file: + reader = csv.DictReader(gz_file, delimiter="\t", quoting=csv.QUOTE_NONE) + yield from reader + + +def parse_gene_info_from_ncbi(fname: pathlib.Path) -> Dict[str, str]: + """Parse gene_info into geneid: symbol pairs""" + genedict: Dict[str, str] = {} + for row in read_tsv_file(fname): + if row["#tax_id"] not in TAX_IDS: + continue + gene_id, symbol = row["GeneID"], row["Symbol"] + genedict[gene_id] = symbol + return genedict + + +def build_already_exists_cache(conn) -> dict: + """ + Build cache for all GeneId, SpeciesID, createtime, PubMed_ID combinations. + Helps prevent duplicate inserts. + """ + cache = {} + query = """SELECT + COUNT(*) as cnt, GeneId, SpeciesId, createtime, PubMed_ID + from GeneRIF_BASIC + GROUP BY GeneId, SpeciesId, createtime, PubMed_Id """ + + with conn.cursor(DictCursor) as cursor: + cursor.execute(query) + while row := cursor.fetchone(): + key = ( + str(row["GeneId"]), + str(row["SpeciesId"]), + row["createtime"], + str(row["PubMed_ID"]), + ) + cache[key] = row["cnt"] + return cache + + +def should_add_rif_row(row: dict, exists_cache: dict) -> bool: + """Checks if we can add a rif_row, prevent duplicate errors from Mysql""" + species_id = str(TAX_IDS[row["#Tax ID"]]) + insert_date = datetime.datetime.fromisoformat(row["last update timestamp"]) + search_key = ( + row["Gene ID"], + species_id, + insert_date, + row["PubMed ID (PMID) list"], + ) + if search_key not in exists_cache: + exists_cache[search_key] = 1 + return True + return False + + +def update_rif(sqluri: str): + """Update GeneRIF_BASIC table""" + with TemporaryDirectory() as _tmpdir: + tmpdir = pathlib.Path(_tmpdir) + gene_info_path = tmpdir / "gene_info.gz" + logging.debug("Fetching gene_info data from: %s", GENE_INFO_URL) + download_file(GENE_INFO_URL, gene_info_path) + + logging.debug("Fetching gene_rif_basics data from: %s", GENERIFS_BASIC_URL) + generif_basics_path = tmpdir / "generif_basics.gz" + download_file( + GENERIFS_BASIC_URL, + generif_basics_path, + ) + + logging.debug("Parsing gene_info data") + genedict = parse_gene_info_from_ncbi(gene_info_path) + with database_connection(sql_uri=sqluri) as con: + exists_cache = build_already_exists_cache(con) + cursor = con.cursor() + skipped_if_exists, added = 0, 0 + for row in read_tsv_file(generif_basics_path): + if row["#Tax ID"] not in TAX_IDS: + continue + if not should_add_rif_row(row, exists_cache): + skipped_if_exists += 1 + continue + species_id = TAX_IDS[row["#Tax ID"]] + symbol = genedict.get(row["Gene ID"], "") + insert_values = ( + species_id, # SpeciesId + row["Gene ID"], # GeneId + symbol, # symbol + row["PubMed ID (PMID) list"], # PubMed_ID + row["last update timestamp"], # createtime + row["GeneRIF text"], # comment + row["#Tax ID"], # TaxID + VERSION_ID, # VersionId + ) + cursor.execute(INSERT_QUERY, insert_values) + added += 1 + if added % 40_000 == 0: + logging.debug("Added 40,000 rows to database") + logging.info( + "Generif_BASIC table updated. Added %s. Skipped %s because they " + "already exists. In case of error, you can use VersionID=%s to find " + "rows inserted with this script", added, skipped_if_exists, + VERSION_ID + ) + + +if __name__ == "__main__": + logging.basicConfig( + level=os.environ.get("LOGLEVEL", "DEBUG"), + format="%(asctime)s %(levelname)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S %Z", + ) + parser = argparse.ArgumentParser("Update Generif_BASIC table") + parser.add_argument( + "--sql-uri", + required=True, + help="MYSQL uri path in the form mysql://user:password@localhost/gn2", + ) + args = parser.parse_args() + update_rif(args.sql_uri) |