diff options
Diffstat (limited to 'scripts/phenotypes_bulk_edit.py')
| -rw-r--r-- | scripts/phenotypes_bulk_edit.py | 281 |
1 files changed, 0 insertions, 281 deletions
diff --git a/scripts/phenotypes_bulk_edit.py b/scripts/phenotypes_bulk_edit.py deleted file mode 100644 index 4888924..0000000 --- a/scripts/phenotypes_bulk_edit.py +++ /dev/null @@ -1,281 +0,0 @@ -import sys -import uuid -import logging -import argparse -from pathlib import Path -from typing import Iterator -from functools import reduce - -import requests -from lxml import etree -from MySQLdb.cursors import DictCursor - -from gn_libs import jobs, mysqldb, sqlite3 - -import uploader.publications.pubmed as pmed -from uploader.publications.models import fetch_phenotype_publications -logging.basicConfig( - format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") -logger = logging.getLogger(__name__) - - -def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool: - """Verify that all the `UniqueIdentifier` values are valid.""" - logger.info("Checking the 'UniqueIdentifier' values.") - with conn.cursor(cursorclass=DictCursor) as cursor: - paramstr = ",".join(["(%s, %s)"] * len(ids)) - cursor.execute( - "SELECT PhenotypeId AS phenotype_id, Id AS xref_id " - "FROM PublishXRef " - f"WHERE (PhenotypeId, Id) IN ({paramstr})", - tuple(item for row in ids for item in row)) - mysqldb.debug_query(cursor, logger) - found = tuple((row["phenotype_id"], row["xref_id"]) - for row in cursor.fetchall()) - - not_found = tuple(item for item in ids if item not in found) - if len(not_found) == 0: - logger.info("All 'UniqueIdentifier' are valid.") - return True - - for item in not_found: - logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1]) - - return False - - -def check_for_mandatory_fields(): - """Verify that mandatory fields have values.""" - pass - - -def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]: - """Fetch basic (non-numeric) phenotypes data from the database.""" - with conn.cursor(cursorclass=DictCursor) as cursor: - paramstr = ",".join(["%s"] * len(ids)) - cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) " - "ORDER BY Id ASC", - ids) - return tuple(dict(row) for row in cursor.fetchall()) - - -def descriptions_differences(file_data, db_data) -> dict[str, str]: - """Compute differences in the descriptions.""" - logger.info("Computing differences in phenotype descriptions.") - assert len(file_data) == len(db_data), "The counts of phenotypes differ!" - description_columns = ("Pre_publication_description", - "Post_publication_description", - "Original_description", - "Pre_publication_abbreviation", - "Post_publication_abbreviation") - diff = tuple() - for file_row, db_row in zip(file_data, db_data): - assert file_row["phenotype_id"] == db_row["Id"] - inner_diff = { - key: file_row[key] - for key in description_columns - if not file_row[key] == db_row[key] - } - if bool(inner_diff): - diff = diff + ({ - "phenotype_id": file_row["phenotype_id"], - **inner_diff - },) - - return diff - - -def __save_new_publications__(conn, publications, pubmed_ids) -> dict: - if len(publications) > 0: - with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.executemany( - ("INSERT INTO " - "Publication( " - "PubMed_ID, Abstract, Authors, Title, Journal, Volume, Pages, " - "Month, Year" - ") " - "VALUES(" - "%(pubmed_id)s, %(abstract)s, %(authors)s, %(title)s, " - "%(journal)s, %(volume)s, %(pages)s, %(month)s, %(year)s" - ") " - "ON DUPLICATE KEY UPDATE " - "Abstract=VALUES(Abstract), Authors=VALUES(Authors), " - "Title=VALUES(Title), Journal=VALUES(Journal), " - "Volume=VALUES(Volume), Pages=VALUES(pages), " - "Month=VALUES(Month), Year=VALUES(Year)"), - publications) - - paramstr = ", ".join(["%s"] * len(pubmed_ids)) - cursor.execute( - ("SELECT Id, PubMed_ID FROM Publication " - f"WHERE PubMed_ID IN ({paramstr})"), - pubmed_ids) - return { - row["PubMed_ID"]: row["Id"] for row in cursor.fetchall() - } - return {} - - -def publications_differences(conn, file_data, db_data, pubmed_ids) -> dict: - """Compute differences in the publications.""" - logger.info("Computing differences in publications.") - assert len(file_data) == len(db_data), "Publication counts differ!" - db_pubmed_ids = reduce(lambda coll, curr: coll.union(set([curr["PubMed_ID"]])), - db_data, - set([None])) - - pubmedid_to_id_map = { - f"{row['PhenotypeId']}::{row['xref_id']}": row["PublicationId"] for row in db_data - } - new_pubmed_ids = tuple(pubmed_ids.difference(db_pubmed_ids)) - new_publications = __save_new_publications__( - conn, pmed.fetch_publications(new_pubmed_ids), new_pubmed_ids) - new_pubmedid_to_id_map = { - row["PubMed_ID"]: new_publications.get( - row["PubMed_ID"], pubmedid_to_id_map[f"{row['phenotype_id']}::{row['xref_id']}"]) - for row in file_data - } - - return tuple( - item for item in ({ - "PhenotypeId": row["phenotype_id"], - "xref_id": row["xref_id"], - "PublicationId": new_pubmedid_to_id_map.get( - row["PubMed_ID"], pubmedid_to_id_map[f"{row['phenotype_id']}::{row['xref_id']}"]) - } for row in file_data) - if item["PublicationId"] != pubmedid_to_id_map[f"{item['PhenotypeId']}::{item['xref_id']}"]) - - -def compute_differences( - conn, - file_contents, - pheno_ids, - pheno_xref_ids, - pubmed_ids -) -> tuple[tuple[dict, ...], tuple[dict, ...], tuple[dict, ...]]: - """Compute differences between data in DB and edited data.""" - logger.info("Computing differences.") - # 1. Basic Phenotype data differences - # a. Descriptions differences - desc_diff = descriptions_differences( - file_contents, __fetch_phenotypes__(conn, pheno_ids)) - logger.debug("DESCRIPTIONS DIFFERENCES: %s", desc_diff) - # b. Publications differences - pub_diff = publications_differences( - conn, - file_contents, - fetch_phenotype_publications(conn, pheno_xref_ids), - pubmed_ids) - logger.debug("Publications diff: %s", pub_diff) - # 2. Data differences - # data_diff = data_differences(...) - pass - - -def update_descriptions(): - """Update descriptions in the database""" - logger.info("Updating descriptions") - # Compute differences between db data and uploaded file - # Only run query for changed descriptions - pass - - -def link_publications(): - """Link phenotypes to relevant publications.""" - logger.info("Linking phenotypes to publications.") - # Create publication if PubMed_ID doesn't exist in db - pass - - -def update_values(): - """Update the phenotype values.""" - logger.info("Updating phenotypes values.") - # Compute differences between db data and uploaded file - # Only run query for changed data - pass - - -def parse_args(): - parser = argparse.ArgumentParser( - prog="Phenotypes Bulk-Edit Processor", - description="Process the bulk-edits to phenotype data and descriptions.") - parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") - parser.add_argument( - "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") - parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") - parser.add_argument( - "--log-level", - type=str, - help="Determines what is logged out.", - choices=("debug", "info", "warning", "error", "critical"), - default="info") - return parser.parse_args() - - -def read_file(filepath: Path) -> Iterator[str]: - """Read the file, one line at a time.""" - with filepath.open(mode="r", encoding="utf-8") as infile: - count = 0 - headers = None - for line in infile: - if line.startswith("#"): # ignore comments - continue; - - fields = line.strip().split("\t") - if count == 0: - headers = fields - count = count + 1 - continue - - _dict = dict(zip( - headers, - ((None if item.strip() == "" else item.strip()) - for item in fields))) - _pheno, _xref = _dict.pop("UniqueIdentifier").split("::") - _dict["phenotype_id"] = int(_pheno.split(":")[1]) - _dict["xref_id"] = int(_xref.split(":")[1]) - if _dict["PubMed_ID"] is not None: - _dict["PubMed_ID"] = int(_dict["PubMed_ID"]) - - yield _dict - count = count + 1 - - -def run(conn, job): - """Process the data and update it.""" - file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])), - key=lambda item: item["phenotype_id"])) - pheno_ids, pheno_xref_ids, pubmed_ids = reduce( - lambda coll, curr: ( - coll[0] + (curr["phenotype_id"],), - coll[1] + ((curr["phenotype_id"], curr["xref_id"]),), - coll[2].union(set([curr["PubMed_ID"]]))), - file_contents, - (tuple(), tuple(), set([None]))) - check_ids(conn, pheno_xref_ids) - check_for_mandatory_fields() - # stop running here if any errors are found. - compute_differences(conn, - file_contents, - pheno_ids, - pheno_xref_ids, - pubmed_ids) - update_descriptions() - link_publications() - update_values() - return 0 - - -def main(): - """Entry-point for this script.""" - args = parse_args() - logger.setLevel(args.log_level.upper()) - logger.debug("Arguments: %s", args) - - with (mysqldb.database_connection(args.db_uri) as conn, - sqlite3.connection(args.jobs_db_path) as jobs_conn): - return run(conn, jobs.job(jobs_conn, args.job_id)) - - -if __name__ == "__main__": - sys.exit(main()) |
