about summary refs log tree commit diff
path: root/uploader/publications/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/publications/models.py')
-rw-r--r--uploader/publications/models.py119
1 files changed, 119 insertions, 0 deletions
diff --git a/uploader/publications/models.py b/uploader/publications/models.py
new file mode 100644
index 0000000..dcfa02b
--- /dev/null
+++ b/uploader/publications/models.py
@@ -0,0 +1,119 @@
+"""Module to handle persistence and retrieval of publication to/from MariaDB"""
+import logging
+from typing import Iterable
+
+from MySQLdb.cursors import DictCursor
+
+from gn_libs.mysqldb import Connection, debug_query
+
+logger = logging.getLogger(__name__)
+
+
+def fetch_phenotype_publications(
+        conn: Connection,
+        ids: tuple[tuple[int, int], ...]
+) -> tuple[dict, ...]:
+    """Fetch publication from database by ID."""
+    paramstr = ",".join(["(%s, %s)"] * len(ids))
+    query = (
+        "SELECT "
+        "pxr.PhenotypeId, pxr.Id AS xref_id, pxr.PublicationId, pub.PubMed_ID "
+        "FROM PublishXRef AS pxr INNER JOIN Publication AS pub "
+        "ON pxr.PublicationId=pub.Id "
+        f"WHERE (pxr.PhenotypeId, pxr.Id) IN ({paramstr})")
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(query, tuple(item for row in ids for item in row))
+        return tuple(dict(row) for row in cursor.fetchall())
+
+
+def create_new_publications(
+        conn: Connection,
+        publications: tuple[dict, ...]
+) -> tuple[dict, ...]:
+    """Create new publications in the database."""
+    if len(publications) > 0:
+        with conn.cursor(cursorclass=DictCursor) as cursor:
+            cursor.executemany(
+                ("INSERT INTO "
+                 "Publication( "
+                 "PubMed_ID, Abstract, Authors, Title, Journal, Volume, Pages, "
+                 "Month, Year"
+                 ") "
+                 "VALUES("
+                 "%(pubmed_id)s, %(abstract)s, %(authors)s, %(title)s, "
+                 "%(journal)s, %(volume)s, %(pages)s, %(month)s, %(year)s"
+                 ") "
+                 "RETURNING *"),
+                publications)
+            return tuple({
+                **row, "publication_id": row["Id"]
+            } for row in cursor.fetchall())
+
+    return tuple()
+
+
+def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tuple[dict, ...]:
+    """Update details for multiple publications"""
+    if len(publications) > 0:
+        with conn.cursor(cursorclass=DictCursor) as cursor:
+            logger.debug("UPDATING PUBLICATIONS: %s", publications)
+            cursor.executemany(
+                ("UPDATE Publication SET "
+                 "PubMed_ID=%(pubmed_id)s, Abstract=%(abstract)s, "
+                 "Authors=%(authors)s, Title=%(title)s, Journal=%(journal)s, "
+                 "Volume=%(volume)s, Pages=%(pages)s, Month=%(month)s, "
+                 "Year=%(year)s "
+                 "WHERE Id=%(publication_id)s"),
+                publications)
+            debug_query(cursor, logger)
+            return publications
+        return tuple()
+    return tuple()
+
+
+def delete_publications(conn: Connection , publications: tuple[dict, ...]):
+    """Delete multiple publications"""
+    publications = tuple(pub for pub in publications if bool(pub))
+    if len(publications) > 0:
+        _pub_ids = tuple(pub["Id"] for pub in publications)
+        _paramstr = ", ".join(["%s"] * len(_pub_ids))
+        _phenos_query = (
+            "SELECT PublicationId, COUNT(PhenotypeId) FROM PublishXRef "
+            f"WHERE PublicationId IN ({_paramstr}) GROUP BY PublicationId;")
+
+        with conn.cursor(cursorclass=DictCursor) as cursor:
+            cursor.execute(_phenos_query, _pub_ids)
+            _linked_phenos = cursor.fetchall()
+            if len(_linked_phenos) > 0:
+                raise Exception(# pylint: disable=[broad-exception-raised]
+                    "Cannot delete publications with linked phenotypes.")
+
+            cursor.execute(
+                f"DELETE FROM Publication WHERE Id IN ({_paramstr})", _pub_ids)
+
+
+def fetch_publication_by_id(conn: Connection, publication_id: int) -> dict:
+    """Fetch a specific publication from the database."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute("SELECT * FROM Publication WHERE Id=%s",
+                       (publication_id,))
+        _res = cursor.fetchone()
+        return dict(_res) if _res else {}
+
+
+def fetch_publication_phenotypes(
+        conn: Connection, publication_id: int) -> Iterable[dict]:
+    """Fetch all phenotypes linked to this publication."""
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute(
+            "SELECT pxr.Id AS xref_id, pxr.PublicationId, phe.* "
+            "FROM PublishXRef AS pxr INNER JOIN Phenotype AS phe "
+            "ON pxr.PhenotypeId=phe.Id "
+            "WHERE pxr.PublicationId=%s",
+            (publication_id,))
+        while True:
+            row = cursor.fetchone()
+            if row:
+                yield row
+            else:
+                break