aboutsummaryrefslogtreecommitdiff
path: root/scripts/phenotypes_bulk_edit.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/phenotypes_bulk_edit.py')
-rw-r--r--scripts/phenotypes_bulk_edit.py266
1 files changed, 266 insertions, 0 deletions
diff --git a/scripts/phenotypes_bulk_edit.py b/scripts/phenotypes_bulk_edit.py
new file mode 100644
index 0000000..cee5f4e
--- /dev/null
+++ b/scripts/phenotypes_bulk_edit.py
@@ -0,0 +1,266 @@
+import sys
+import uuid
+import logging
+import argparse
+from pathlib import Path
+from typing import Iterator
+from functools import reduce
+
+from MySQLdb.cursors import DictCursor
+
+from gn_libs import jobs, mysqldb, sqlite3
+
+from uploader.phenotypes.models import phenotypes_data_by_ids
+from uploader.phenotypes.misc import phenotypes_data_differences
+from uploader.phenotypes.views import BULK_EDIT_COMMON_FIELDNAMES
+
+import uploader.publications.pubmed as pmed
+from uploader.publications.misc import publications_differences
+from uploader.publications.models import (
+ update_publications, fetch_phenotype_publications)
+
+logging.basicConfig(
+ format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
+logger = logging.getLogger(__name__)
+
+
+def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool:
+ """Verify that all the `UniqueIdentifier` values are valid."""
+ logger.info("Checking the 'UniqueIdentifier' values.")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ paramstr = ",".join(["(%s, %s)"] * len(ids))
+ cursor.execute(
+ "SELECT PhenotypeId AS phenotype_id, Id AS xref_id "
+ "FROM PublishXRef "
+ f"WHERE (PhenotypeId, Id) IN ({paramstr})",
+ tuple(item for row in ids for item in row))
+ mysqldb.debug_query(cursor, logger)
+ found = tuple((row["phenotype_id"], row["xref_id"])
+ for row in cursor.fetchall())
+
+ not_found = tuple(item for item in ids if item not in found)
+ if len(not_found) == 0:
+ logger.info("All 'UniqueIdentifier' are valid.")
+ return True
+
+ for item in not_found:
+ logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1])
+
+ return False
+
+
+def check_for_mandatory_fields():
+ """Verify that mandatory fields have values."""
+ pass
+
+
+def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]:
+ """Fetch basic (non-numeric) phenotypes data from the database."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ paramstr = ",".join(["%s"] * len(ids))
+ cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) "
+ "ORDER BY Id ASC",
+ ids)
+ return tuple(dict(row) for row in cursor.fetchall())
+
+
+def descriptions_differences(file_data, db_data) -> dict[str, str]:
+ """Compute differences in the descriptions."""
+ logger.info("Computing differences in phenotype descriptions.")
+ assert len(file_data) == len(db_data), "The counts of phenotypes differ!"
+ description_columns = ("Pre_publication_description",
+ "Post_publication_description",
+ "Original_description",
+ "Pre_publication_abbreviation",
+ "Post_publication_abbreviation")
+ diff = tuple()
+ for file_row, db_row in zip(file_data, db_data):
+ assert file_row["phenotype_id"] == db_row["Id"]
+ inner_diff = {
+ key: file_row[key]
+ for key in description_columns
+ if not file_row[key] == db_row[key]
+ }
+ if bool(inner_diff):
+ diff = diff + ({
+ "phenotype_id": file_row["phenotype_id"],
+ **inner_diff
+ },)
+
+ return diff
+
+
+def update_descriptions():
+ """Update descriptions in the database"""
+ logger.info("Updating descriptions")
+ # Compute differences between db data and uploaded file
+ # Only run query for changed descriptions
+ pass
+
+
+def link_publications():
+ """Link phenotypes to relevant publications."""
+ logger.info("Linking phenotypes to publications.")
+ # Create publication if PubMed_ID doesn't exist in db
+ pass
+
+
+def update_values():
+ """Update the phenotype values."""
+ logger.info("Updating phenotypes values.")
+ # Compute differences between db data and uploaded file
+ # Only run query for changed data
+ pass
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ prog="Phenotypes Bulk-Edit Processor",
+ description="Process the bulk-edits to phenotype data and descriptions.")
+ parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
+ parser.add_argument(
+ "jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
+ parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
+ return parser.parse_args()
+
+
+def read_file(filepath: Path) -> Iterator[str]:
+ """Read the file, one line at a time."""
+ with filepath.open(mode="r", encoding="utf-8") as infile:
+ count = 0
+ headers = None
+ for line in infile:
+ if line.startswith("#"): # ignore comments
+ continue;
+
+ fields = line.strip().split("\t")
+ if count == 0:
+ headers = fields
+ count = count + 1
+ continue
+
+ _dict = dict(zip(
+ headers,
+ ((None if item.strip() == "" else item.strip())
+ for item in fields)))
+ _pheno, _xref = _dict.pop("UniqueIdentifier").split("::")
+ _dict = {
+ key: ((float(val) if bool(val) else val)
+ if key not in BULK_EDIT_COMMON_FIELDNAMES
+ else val)
+ for key, val in _dict.items()
+ }
+ _dict["phenotype_id"] = int(_pheno.split(":")[1])
+ _dict["xref_id"] = int(_xref.split(":")[1])
+ if _dict["PubMed_ID"] is not None:
+ _dict["PubMed_ID"] = int(_dict["PubMed_ID"])
+
+ yield _dict
+ count = count + 1
+
+
+def run(conn, job):
+ """Process the data and update it."""
+ file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])),
+ key=lambda item: item["phenotype_id"]))
+ pheno_ids, pheno_xref_ids, pubmed_ids = reduce(
+ lambda coll, curr: (
+ coll[0] + (curr["phenotype_id"],),
+ coll[1] + ((curr["phenotype_id"], curr["xref_id"]),),
+ coll[2].union(set([curr["PubMed_ID"]]))),
+ file_contents,
+ (tuple(), tuple(), set([None])))
+ check_ids(conn, pheno_xref_ids)
+ check_for_mandatory_fields()
+ # stop running here if any errors are found.
+
+ ### Compute differences
+ logger.info("Computing differences.")
+ # 1. Basic Phenotype data differences
+ # a. Descriptions differences
+ _desc_diff = descriptions_differences(
+ file_contents, __fetch_phenotypes__(conn, pheno_ids))
+ logger.debug("DESCRIPTIONS DIFFERENCES: %s", _desc_diff)
+
+ # b. Publications differences
+ _db_publications = fetch_phenotype_publications(conn, pheno_xref_ids)
+ logger.debug("DB PUBLICATIONS: %s", _db_publications)
+
+ _pubmed_map = {
+ (int(row["PubMed_ID"]) if bool(row["PubMed_ID"]) else None): f"{row['phenotype_id']}::{row['xref_id']}"
+ for row in file_contents
+ }
+ _pub_id_map = {
+ f"{pub['PhenotypeId']}::{pub['xref_id']}": pub["PublicationId"]
+ for pub in _db_publications
+ }
+
+ _new_publications = update_publications(
+ conn, tuple({
+ **pub, "publication_id": _pub_id_map[_pubmed_map[pub["pubmed_id"]]]
+ } for pub in pmed.fetch_publications(tuple(
+ pubmed_id for pubmed_id in pubmed_ids
+ if pubmed_id not in
+ tuple(row["PubMed_ID"] for row in _db_publications)))))
+ _pub_diff = publications_differences(
+ file_contents, _db_publications, {
+ row["PubMed_ID" if "PubMed_ID" in row else "pubmed_id"]: row[
+ "PublicationId" if "PublicationId" in row else "publication_id"]
+ for row in _db_publications + _new_publications})
+ logger.debug("Publications diff: %s", _pub_diff)
+ # 2. Data differences
+ _db_pheno_data = phenotypes_data_by_ids(conn, tuple({
+ "population_id": job["metadata"]["population-id"],
+ "phenoid": row[0],
+ "xref_id": row[1]
+ } for row in pheno_xref_ids))
+
+ data_diff = phenotypes_data_differences(
+ ({
+ "phenotype_id": row["phenotype_id"],
+ "xref_id": row["xref_id"],
+ "data": {
+ key:val for key,val in row.items()
+ if key not in BULK_EDIT_COMMON_FIELDNAMES + [
+ "phenotype_id", "xref_id"]
+ }
+ } for row in file_contents),
+ ({
+ **row,
+ "PhenotypeId": row["Id"],
+ "data": {
+ dataitem["StrainName"]: dataitem
+ for dataitem in row["data"].values()
+ }
+ } for row in _db_pheno_data))
+ logger.debug("Data differences: %s", data_diff)
+ ### END: Compute differences
+ update_descriptions()
+ link_publications()
+ update_values()
+ return 0
+
+
+def main():
+ """Entry-point for this script."""
+ args = parse_args()
+ logger.setLevel(args.log_level.upper())
+ logger.debug("Arguments: %s", args)
+
+ logging.getLogger("uploader.phenotypes.misc").setLevel(args.log_level.upper())
+ logging.getLogger("uploader.phenotypes.models").setLevel(args.log_level.upper())
+ logging.getLogger("uploader.publications.models").setLevel(args.log_level.upper())
+
+ with (mysqldb.database_connection(args.db_uri) as conn,
+ sqlite3.connection(args.jobs_db_path) as jobs_conn):
+ return run(conn, jobs.job(jobs_conn, args.job_id))
+
+
+if __name__ == "__main__":
+ sys.exit(main())