1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
import sys
import uuid
import logging
import argparse
from pathlib import Path
from typing import Iterator
from functools import reduce
from MySQLdb.cursors import DictCursor
from gn_libs import jobs, mysqldb, sqlite3
logging.basicConfig(
format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool:
"""Verify that all the `UniqueIdentifier` values are valid."""
logger.info("Checking the 'UniqueIdentifier' values.")
with conn.cursor(cursorclass=DictCursor) as cursor:
paramstr = ",".join(["(%s, %s)"] * len(ids))
cursor.execute(
"SELECT PhenotypeId AS phenotype_id, Id AS xref_id "
"FROM PublishXRef "
f"WHERE (PhenotypeId, Id) IN ({paramstr})",
tuple(item for row in ids for item in row))
mysqldb.debug_query(cursor, logger)
found = tuple((row["phenotype_id"], row["xref_id"])
for row in cursor.fetchall())
not_found = tuple(item for item in ids if item not in found)
if len(not_found) == 0:
logger.info("All 'UniqueIdentifier' are valid.")
return True
for item in not_found:
logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1])
return False
def check_for_mandatory_fields():
"""Verify that mandatory fields have values."""
pass
def compute_differences():
"""Compute differences between data in DB and edited data."""
logger.info("Computing differences.")
pass
def update_descriptions():
"""Update descriptions in the database"""
logger.info("Updating descriptions")
# Compute differences between db data and uploaded file
# Only run query for changed descriptions
pass
def link_publications():
"""Link phenotypes to relevant publications."""
logger.info("Linking phenotypes to publications.")
# Create publication if PubMed_ID doesn't exist in db
pass
def update_values():
"""Update the phenotype values."""
logger.info("Updating phenotypes values.")
# Compute differences between db data and uploaded file
# Only run query for changed data
pass
def parse_args():
parser = argparse.ArgumentParser(
prog="Phenotypes Bulk-Edit Processor",
description="Process the bulk-edits to phenotype data and descriptions.")
parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
parser.add_argument(
"jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
parser.add_argument(
"--log-level",
type=str,
help="Determines what is logged out.",
choices=("debug", "info", "warning", "error", "critical"),
default="info")
return parser.parse_args()
def read_file(filepath: Path) -> Iterator[str]:
"""Read the file, one line at a time."""
with filepath.open(mode="r", encoding="utf-8") as infile:
count = 0
headers = None
for line in infile:
if line.startswith("#"): # ignore comments
continue;
fields = line.strip().split("\t")
if count == 0:
headers = fields
count = count + 1
continue
_dict = dict(zip(
headers,
((None if item.strip() == "" else item.strip())
for item in fields)))
_pheno, _xref = _dict.pop("UniqueIdentifier").split("::")
_dict["phenotype_id"] = int(_pheno.split(":")[1])
_dict["xref_id"] = int(_xref.split(":")[1])
yield _dict
count = count + 1
def run(conn, job):
"""Process the data and update it."""
file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])),
key=lambda item: item["phenotype_id"]))
pheno_ids, pheno_xref_ids = reduce(
lambda coll, curr: (
coll[0] + (curr["phenotype_id"],),
coll[1] + ((curr["phenotype_id"], curr["xref_id"]),)),
file_contents,
(tuple(), tuple()))
check_ids(conn, pheno_xref_ids)
check_for_mandatory_fields()
# stop running here if any errors are found.
compute_differences()
update_descriptions()
link_publications()
update_values()
return 0
def main():
"""Entry-point for this script."""
args = parse_args()
logger.setLevel(args.log_level.upper())
logger.debug("Arguments: %s", args)
with (mysqldb.database_connection(args.db_uri) as conn,
sqlite3.connection(args.jobs_db_path) as jobs_conn):
return run(conn, jobs.job(jobs_conn, args.job_id))
if __name__ == "__main__":
sys.exit(main())
|