1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
|
import sys
import uuid
import logging
import argparse
from pathlib import Path
from typing import Iterator
from functools import reduce
import requests
from lxml import etree
from MySQLdb.cursors import DictCursor
from gn_libs import jobs, mysqldb, sqlite3
logging.basicConfig(
format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool:
"""Verify that all the `UniqueIdentifier` values are valid."""
logger.info("Checking the 'UniqueIdentifier' values.")
with conn.cursor(cursorclass=DictCursor) as cursor:
paramstr = ",".join(["(%s, %s)"] * len(ids))
cursor.execute(
"SELECT PhenotypeId AS phenotype_id, Id AS xref_id "
"FROM PublishXRef "
f"WHERE (PhenotypeId, Id) IN ({paramstr})",
tuple(item for row in ids for item in row))
mysqldb.debug_query(cursor, logger)
found = tuple((row["phenotype_id"], row["xref_id"])
for row in cursor.fetchall())
not_found = tuple(item for item in ids if item not in found)
if len(not_found) == 0:
logger.info("All 'UniqueIdentifier' are valid.")
return True
for item in not_found:
logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1])
return False
def check_for_mandatory_fields():
"""Verify that mandatory fields have values."""
pass
def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]:
"""Fetch basic (non-numeric) phenotypes data from the database."""
with conn.cursor(cursorclass=DictCursor) as cursor:
paramstr = ",".join(["%s"] * len(ids))
cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) "
"ORDER BY Id ASC",
ids)
return tuple(dict(row) for row in cursor.fetchall())
def descriptions_differences(file_data, db_data) -> dict[str, str]:
"""Compute differences in the descriptions."""
logger.info("Computing differences in phenotype descriptions.")
assert len(file_data) == len(db_data), "The counts of phenotypes differ!"
description_columns = ("Pre_publication_description",
"Post_publication_description",
"Original_description",
"Pre_publication_abbreviation",
"Post_publication_abbreviation")
diff = tuple()
for file_row, db_row in zip(file_data, db_data):
assert file_row["phenotype_id"] == db_row["Id"]
inner_diff = {
key: file_row[key]
for key in description_columns
if not file_row[key] == db_row[key]
}
if bool(inner_diff):
diff = diff + ({
"phenotype_id": file_row["phenotype_id"],
**inner_diff
},)
return diff
def __fetch_publications__(conn, ids):
"""Fetch publication from database by ID."""
paramstr = ",".join(["(%s, %s)"] * len(ids))
query = (
"SELECT "
"pxr.PhenotypeId, pxr.Id AS xref_id, pxr.PublicationId, pub.PubMed_ID "
"FROM PublishXRef AS pxr INNER JOIN Publication AS pub "
"ON pxr.PublicationId=pub.Id "
f"WHERE (pxr.PhenotypeId, pxr.Id) IN ({paramstr})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, tuple(item for row in ids for item in row))
return tuple(dict(row) for row in cursor.fetchall())
def __pub_date__(pubdate: etree.Element):
pubyear = pubdate.find("Year")
pubmonth = pubdate.find("Month")
pubday = pubdate.find("Day")
return {
"year": pubyear.text if pubyear is not None else None,
"month": pubmonth.text if pubmonth is not None else None,
"day": pubday.text if pubday is not None else None
}
def __journal__(journal: etree.Element) -> dict:
volume = journal.find("JournalIssue/Volume")
issue = journal.find("JournalIssue/Issue")
return {
"volume": volume.text if volume is not None else None,
"issue": issue.text if issue is not None else None,
**__pub_date__(journal.find("JournalIssue/PubDate")),
"journal": journal.find("Title").text
}
def __author__(author: etree.Element) -> str:
return "%s %s" % (
author.find("LastName").text,
author.find("Initials").text)
def __pages__(pagination: etree.Element) -> str:
start = pagination.find("StartPage")
end = pagination.find("EndPage")
return (start.text + (
f"-{end.text}" if end is not None else ""
)) if start is not None else ""
def __abstract__(article: etree.Element) -> str:
abstract = article.find("Abstract/AbstractText")
return abstract.text if abstract is not None else None
def __article__(pubmed_article: etree.Element) -> dict:
article = pubmed_article.find("MedlineCitation/Article")
return {
"pubmed_id": pubmed_article.find("MedlineCitation/PMID").text,
"title": article.find("ArticleTitle").text,
**__journal__(article.find("Journal")),
"abstract": __abstract__(article),
"pages": __pages__(article.find("Pagination")),
"authors": ", ".join(__author__(author)
for author in article.findall("AuthorList/Author"))
}
def __process_pubmed_publication_data__(text):
"""Process the data from PubMed into usable data."""
doc = etree.XML(text)
articles = doc.xpath("//PubmedArticle")
logger.debug("Retrieved %s publications from NCBI", len(articles))
return tuple(__article__(article) for article in articles)
def __fetch_new_pubmed_ids__(pubmed_ids):
"""Retrieve data on new publications from NCBI."""
# See whether we can retrieve multiple publications in one go
# Parse data and save to DB
# Return PublicationId(s) for new publication(s).
if len(pubmed_ids) == 0:
logger.debug("There are no new PubMed IDs to fetch")
return tuple()
logger.info("Fetching publications data for the following PubMed IDs: %s",
", ".join((str(pid) for pid in pubmed_ids)))
# Should we, perhaps, pass this in from a config variable?
uri = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
try:
response = requests.get(
uri,
params={
"db": "pubmed",
"retmode": "xml",
"id": ",".join(str(item) for item in pubmed_ids)
})
if response.status_code == 200:
return __process_pubmed_publication_data__(response.text)
logger.error(
"Could not fetch the new publication from %s (status code: %s)",
uri,
response.status_code)
except requests.exceptions.ConnectionError:
logger.error("Could not find the domain %s", uri)
return tuple()
def __save_new_publications__(conn, publications, pubmed_ids) -> dict:
if len(publications) > 0:
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.executemany(
("INSERT INTO "
"Publication( "
"PubMed_ID, Abstract, Authors, Title, Journal, Volume, Pages, "
"Month, Year"
") "
"VALUES("
"%(pubmed_id)s, %(abstract)s, %(authors)s, %(title)s, "
"%(journal)s, %(volume)s, %(pages)s, %(month)s, %(year)s"
") "
"ON DUPLICATE KEY UPDATE "
"Abstract=VALUES(Abstract), Authors=VALUES(Authors), "
"Title=VALUES(Title), Journal=VALUES(Journal), "
"Volume=VALUES(Volume), Pages=VALUES(pages), "
"Month=VALUES(Month), Year=VALUES(Year)"),
publications)
paramstr = ", ".join(["%s"] * len(pubmed_ids))
cursor.execute(
("SELECT Id, PubMed_ID FROM Publication "
f"WHERE PubMed_ID IN ({paramstr})"),
pubmed_ids)
return {
row["PubMed_ID"]: row["Id"] for row in cursor.fetchall()
}
return {}
def publications_differences(conn, file_data, db_data, pubmed_ids) -> dict:
"""Compute differences in the publications."""
logger.info("Computing differences in publications.")
assert len(file_data) == len(db_data), "Publication counts differ!"
db_pubmed_ids = reduce(lambda coll, curr: coll.union(set([curr["PubMed_ID"]])),
db_data,
set([None]))
pubmedid_to_id_map = {
f"{row['PhenotypeId']}::{row['xref_id']}": row["PublicationId"] for row in db_data
}
new_pubmed_ids = tuple(pubmed_ids.difference(db_pubmed_ids))
new_publications = __save_new_publications__(
conn, __fetch_new_pubmed_ids__(new_pubmed_ids), new_pubmed_ids)
new_pubmedid_to_id_map = {
row["PubMed_ID"]: new_publications.get(
row["PubMed_ID"], pubmedid_to_id_map[f"{row['phenotype_id']}::{row['xref_id']}"])
for row in file_data
}
return tuple(
item for item in ({
"PhenotypeId": row["phenotype_id"],
"xref_id": row["xref_id"],
"PublicationId": new_pubmedid_to_id_map.get(
row["PubMed_ID"], pubmedid_to_id_map[f"{row['phenotype_id']}::{row['xref_id']}"])
} for row in file_data)
if item["PublicationId"] != pubmedid_to_id_map[f"{item['PhenotypeId']}::{item['xref_id']}"])
def compute_differences(
conn,
file_contents,
pheno_ids,
pheno_xref_ids,
pubmed_ids
) -> tuple[tuple[dict, ...], tuple[dict, ...], tuple[dict, ...]]:
"""Compute differences between data in DB and edited data."""
logger.info("Computing differences.")
# 1. Basic Phenotype data differences
# a. Descriptions differences
desc_diff = descriptions_differences(
file_contents, __fetch_phenotypes__(conn, pheno_ids))
logger.debug("DESCRIPTIONS DIFFERENCES: %s", desc_diff)
# b. Publications differences
pub_diff = publications_differences(
conn,
file_contents,
__fetch_publications__(conn, pheno_xref_ids),
pubmed_ids)
logger.debug("Publications diff: %s", pub_diff)
# 2. Data differences
# data_diff = data_differences(...)
pass
def update_descriptions():
"""Update descriptions in the database"""
logger.info("Updating descriptions")
# Compute differences between db data and uploaded file
# Only run query for changed descriptions
pass
def link_publications():
"""Link phenotypes to relevant publications."""
logger.info("Linking phenotypes to publications.")
# Create publication if PubMed_ID doesn't exist in db
pass
def update_values():
"""Update the phenotype values."""
logger.info("Updating phenotypes values.")
# Compute differences between db data and uploaded file
# Only run query for changed data
pass
def parse_args():
parser = argparse.ArgumentParser(
prog="Phenotypes Bulk-Edit Processor",
description="Process the bulk-edits to phenotype data and descriptions.")
parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
parser.add_argument(
"jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
parser.add_argument(
"--log-level",
type=str,
help="Determines what is logged out.",
choices=("debug", "info", "warning", "error", "critical"),
default="info")
return parser.parse_args()
def read_file(filepath: Path) -> Iterator[str]:
"""Read the file, one line at a time."""
with filepath.open(mode="r", encoding="utf-8") as infile:
count = 0
headers = None
for line in infile:
if line.startswith("#"): # ignore comments
continue;
fields = line.strip().split("\t")
if count == 0:
headers = fields
count = count + 1
continue
_dict = dict(zip(
headers,
((None if item.strip() == "" else item.strip())
for item in fields)))
_pheno, _xref = _dict.pop("UniqueIdentifier").split("::")
_dict["phenotype_id"] = int(_pheno.split(":")[1])
_dict["xref_id"] = int(_xref.split(":")[1])
if _dict["PubMed_ID"] is not None:
_dict["PubMed_ID"] = int(_dict["PubMed_ID"])
yield _dict
count = count + 1
def run(conn, job):
"""Process the data and update it."""
file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])),
key=lambda item: item["phenotype_id"]))
pheno_ids, pheno_xref_ids, pubmed_ids = reduce(
lambda coll, curr: (
coll[0] + (curr["phenotype_id"],),
coll[1] + ((curr["phenotype_id"], curr["xref_id"]),),
coll[2].union(set([curr["PubMed_ID"]]))),
file_contents,
(tuple(), tuple(), set([None])))
check_ids(conn, pheno_xref_ids)
check_for_mandatory_fields()
# stop running here if any errors are found.
compute_differences(conn,
file_contents,
pheno_ids,
pheno_xref_ids,
pubmed_ids)
update_descriptions()
link_publications()
update_values()
return 0
def main():
"""Entry-point for this script."""
args = parse_args()
logger.setLevel(args.log_level.upper())
logger.debug("Arguments: %s", args)
with (mysqldb.database_connection(args.db_uri) as conn,
sqlite3.connection(args.jobs_db_path) as jobs_conn):
return run(conn, jobs.job(jobs_conn, args.job_id))
if __name__ == "__main__":
sys.exit(main())
|