about summary refs log tree commit diff
path: root/uploader/publications/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/publications/models.py')
-rw-r--r--uploader/publications/models.py27
1 files changed, 25 insertions, 2 deletions
diff --git a/uploader/publications/models.py b/uploader/publications/models.py
index b199991..dcfa02b 100644
--- a/uploader/publications/models.py
+++ b/uploader/publications/models.py
@@ -1,6 +1,6 @@
 """Module to handle persistence and retrieval of publication to/from MariaDB"""
 import logging
-from typing import Iterable, Optional
+from typing import Iterable
 
 from MySQLdb.cursors import DictCursor
 
@@ -30,6 +30,7 @@ def create_new_publications(
         conn: Connection,
         publications: tuple[dict, ...]
 ) -> tuple[dict, ...]:
+    """Create new publications in the database."""
     if len(publications) > 0:
         with conn.cursor(cursorclass=DictCursor) as cursor:
             cursor.executemany(
@@ -47,7 +48,8 @@ def create_new_publications(
             return tuple({
                 **row, "publication_id": row["Id"]
             } for row in cursor.fetchall())
-        return tuple()
+
+    return tuple()
 
 
 def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tuple[dict, ...]:
@@ -69,6 +71,27 @@ def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tu
     return tuple()
 
 
+def delete_publications(conn: Connection , publications: tuple[dict, ...]):
+    """Delete multiple publications"""
+    publications = tuple(pub for pub in publications if bool(pub))
+    if len(publications) > 0:
+        _pub_ids = tuple(pub["Id"] for pub in publications)
+        _paramstr = ", ".join(["%s"] * len(_pub_ids))
+        _phenos_query = (
+            "SELECT PublicationId, COUNT(PhenotypeId) FROM PublishXRef "
+            f"WHERE PublicationId IN ({_paramstr}) GROUP BY PublicationId;")
+
+        with conn.cursor(cursorclass=DictCursor) as cursor:
+            cursor.execute(_phenos_query, _pub_ids)
+            _linked_phenos = cursor.fetchall()
+            if len(_linked_phenos) > 0:
+                raise Exception(# pylint: disable=[broad-exception-raised]
+                    "Cannot delete publications with linked phenotypes.")
+
+            cursor.execute(
+                f"DELETE FROM Publication WHERE Id IN ({_paramstr})", _pub_ids)
+
+
 def fetch_publication_by_id(conn: Connection, publication_id: int) -> dict:
     """Fetch a specific publication from the database."""
     with conn.cursor(cursorclass=DictCursor) as cursor: