about summary refs log tree commit diff
path: root/uploader/publications
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/publications')
-rw-r--r--uploader/publications/datatables.py52
-rw-r--r--uploader/publications/models.py42
-rw-r--r--uploader/publications/pubmed.py7
-rw-r--r--uploader/publications/views.py121
4 files changed, 179 insertions, 43 deletions
diff --git a/uploader/publications/datatables.py b/uploader/publications/datatables.py
new file mode 100644
index 0000000..e07fafd
--- /dev/null
+++ b/uploader/publications/datatables.py
@@ -0,0 +1,52 @@
+"""Fetch data for datatables."""
+import logging
+from typing import Optional
+
+from MySQLdb.cursors import DictCursor
+
+from gn_libs.mysqldb import Connection, debug_query
+
+logger = logging.getLogger(__name__)
+
+def fetch_publications(
+        conn: Connection,
+        search: Optional[str] = None,
+        offset: int = 0,
+        limit: int = -1
+) -> tuple[dict, int, int, int]:
+    """Fetch publications from the database."""
+    _query = "SELECT * FROM Publication"
+    _count_query = "SELECT COUNT(*) FROM Publication"
+    _params = None
+    _where_clause = ""
+    _limit_clause = ""
+    if search is not None and bool(search):
+        _where_clause = ("WHERE PubMed_ID LIKE %s "
+                      "OR Authors LIKE %s "
+                      "OR Title LIKE %s")
+        _params = (f"%{search}%",) * 3
+
+    if limit > 0:
+        _limit_clause = f"LIMIT {limit} OFFSET {offset}"
+
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        cursor.execute("SELECT COUNT(*) FROM Publication")
+        _total_rows = int(cursor.fetchone()["COUNT(*)"])
+
+        cursor.execute(f"{_count_query} {_where_clause}", _params)
+        debug_query(cursor, logger)
+        _result = cursor.fetchone()
+        _total_filtered = int(_result["COUNT(*)"] if bool(_result) else 0)
+
+        cursor.execute(f"{_query} {_where_clause} {_limit_clause}", _params)
+        debug_query(cursor, logger)
+        _current_filtered = tuple(
+            {**dict(row), "index": idx}
+            for idx, row
+            in enumerate(cursor.fetchall(), start=offset+1))
+
+        return (
+            _current_filtered,
+            len(_current_filtered),
+            _total_filtered,
+            _total_rows)
diff --git a/uploader/publications/models.py b/uploader/publications/models.py
index 992ebfa..dcfa02b 100644
--- a/uploader/publications/models.py
+++ b/uploader/publications/models.py
@@ -1,6 +1,6 @@
 """Module to handle persistence and retrieval of publication to/from MariaDB"""
 import logging
-from typing import Iterable, Optional
+from typing import Iterable
 
 from MySQLdb.cursors import DictCursor
 
@@ -30,6 +30,7 @@ def create_new_publications(
         conn: Connection,
         publications: tuple[dict, ...]
 ) -> tuple[dict, ...]:
+    """Create new publications in the database."""
     if len(publications) > 0:
         with conn.cursor(cursorclass=DictCursor) as cursor:
             cursor.executemany(
@@ -47,7 +48,8 @@ def create_new_publications(
             return tuple({
                 **row, "publication_id": row["Id"]
             } for row in cursor.fetchall())
-        return tuple()
+
+    return tuple()
 
 
 def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tuple[dict, ...]:
@@ -69,23 +71,25 @@ def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tu
     return tuple()
 
 
-def fetch_publications(
-        conn: Connection,
-        search: Optional[str] = None
-) -> Iterable[dict]:
-    """Fetch publications from the database."""
-    _query = "SELECT * FROM Publication"
-    _params = None
-    if search is not None and bool(search):
-        _query = (f"{_query} "
-                  "WHERE CONCAT(PubMed_ID, ' ', Authors, ' ', Title) "
-                  "LIKE %s")
-        _params = (f"%{search}%",)
-    with conn.cursor(cursorclass=DictCursor) as cursor:
-        cursor.execute(_query, _params)
-        debug_query(_query, logger)
-        for row in cursor.fetchall():
-            yield dict(row)
+def delete_publications(conn: Connection , publications: tuple[dict, ...]):
+    """Delete multiple publications"""
+    publications = tuple(pub for pub in publications if bool(pub))
+    if len(publications) > 0:
+        _pub_ids = tuple(pub["Id"] for pub in publications)
+        _paramstr = ", ".join(["%s"] * len(_pub_ids))
+        _phenos_query = (
+            "SELECT PublicationId, COUNT(PhenotypeId) FROM PublishXRef "
+            f"WHERE PublicationId IN ({_paramstr}) GROUP BY PublicationId;")
+
+        with conn.cursor(cursorclass=DictCursor) as cursor:
+            cursor.execute(_phenos_query, _pub_ids)
+            _linked_phenos = cursor.fetchall()
+            if len(_linked_phenos) > 0:
+                raise Exception(# pylint: disable=[broad-exception-raised]
+                    "Cannot delete publications with linked phenotypes.")
+
+            cursor.execute(
+                f"DELETE FROM Publication WHERE Id IN ({_paramstr})", _pub_ids)
 
 
 def fetch_publication_by_id(conn: Connection, publication_id: int) -> dict:
diff --git a/uploader/publications/pubmed.py b/uploader/publications/pubmed.py
index ed9b652..2531c4a 100644
--- a/uploader/publications/pubmed.py
+++ b/uploader/publications/pubmed.py
@@ -29,9 +29,7 @@ def __journal__(journal: etree.Element) -> dict:
     }
 
 def __author__(author: etree.Element) -> str:
-    return "%s %s" % (
-        author.find("LastName").text,
-        author.find("Initials").text)
+    return f'{author.find("LastName").text} {author.find("Initials").text}'
 
 
 def __pages__(pagination: etree.Element) -> str:
@@ -88,7 +86,8 @@ def fetch_publications(pubmed_ids: tuple[int, ...]) -> tuple[dict, ...]:
                 "db": "pubmed",
                 "retmode": "xml",
                 "id": ",".join(str(item) for item in pubmed_ids)
-            })
+            },
+            timeout=(9.13, 20))
 
         if response.status_code == 200:
             return __process_pubmed_publication_data__(response.text)
diff --git a/uploader/publications/views.py b/uploader/publications/views.py
index 63acf1b..4ec832f 100644
--- a/uploader/publications/views.py
+++ b/uploader/publications/views.py
@@ -1,26 +1,27 @@
 """Endpoints for publications"""
 import json
 
-from MySQLdb.cursors import DictCursor
 from gn_libs.mysqldb import database_connection
 from flask import (
     flash,
     request,
-    url_for,
     redirect,
     Blueprint,
     render_template,
     current_app as app)
 
+from uploader.flask_extensions import url_for
 from uploader.authorisation import require_login
+from uploader.route_utils import redirect_to_next
 
 from .models import (
-    fetch_publications,
+    delete_publications,
+    update_publications,
     fetch_publication_by_id,
     create_new_publications,
     fetch_publication_phenotypes)
 
-from gn_libs.debug import __pk__
+from .datatables import fetch_publications
 
 pubbp = Blueprint("publications", __name__)
 
@@ -29,29 +30,31 @@ pubbp = Blueprint("publications", __name__)
 @require_login
 def index():
     """Index page for publications."""
-    with database_connection(app.config["SQL_URI"]) as conn:
-        return render_template("publications/index.html")
+    return render_template("publications/index.html")
 
 
 @pubbp.route("/list", methods=["GET"])
 @require_login
 def list_publications():
+    """Fetch publications that fulfill a specific search, or all of them, if
+    there is no search term."""
     # request breakdown:
     # https://datatables.net/manual/server-side
-    with (database_connection(app.config["SQL_URI"]) as conn,
-          conn.cursor(cursorclass=DictCursor) as cursor):
-        cursor.execute("SELECT COUNT(*) FROM Publication")
-        _totalrows = int(cursor.fetchone()["COUNT(*)"])
-        _publications = tuple({
-            **row, "index": idx
-        } for idx,row in enumerate(
-            fetch_publications(
-                conn, request.args["search[value]"]), start=1))
+    _page = int(request.args.get("draw"))
+    _length = int(request.args.get("length") or '-1')
+    _start = int(request.args.get("start") or '0')
+    _search = request.args["search[value]"]
+    with database_connection(app.config["SQL_URI"]) as conn:
+        _publications, _current_rows, _totalfiltered, _totalrows = fetch_publications(
+            conn,
+            _search,
+            offset=_start,
+            limit=_length)
 
         return json.dumps({
-            "draw": request.args.get("draw"),
+            "draw": _page,
             "recordsTotal": _totalrows,
-            "recordsFiltered": len(_publications),
+            "recordsFiltered": _totalfiltered,
             "publications": _publications,
             "status": "success"
         })
@@ -62,9 +65,15 @@ def list_publications():
 def view_publication(publication_id: int):
     """View more details on a particular publication."""
     with database_connection(app.config["SQL_URI"]) as conn:
+        publication = fetch_publication_by_id(conn, publication_id)
+
+        if not bool(publication):
+            flash("Requested publication was not found!", "alert-warning")
+            return redirect(url_for('publications.index'))
+
         return render_template(
             "publications/view-publication.html",
-            publication=fetch_publication_by_id(conn, publication_id),
+            publication=publication,
             linked_phenotypes=tuple(fetch_publication_phenotypes(
                 conn, publication_id)))
 
@@ -73,7 +82,7 @@ def view_publication(publication_id: int):
 @require_login
 def create_publication():
     """Create a new publication."""
-    if(request.method == "GET"):
+    if request.method == "GET":
         return render_template("publications/create-publication.html")
     form = request.form
     authors = form.get("publication-authors").encode("utf8")
@@ -83,7 +92,7 @@ def create_publication():
 
     with database_connection(app.config["SQL_URI"]) as conn:
         publications = create_new_publications(conn, ({
-            "pubmed_id": form.get("pubmed-id"),
+            "pubmed_id": form.get("pubmed-id") or None,
             "abstract": form.get("publication-abstract").encode("utf8") or None,
             "authors": authors,
             "title":  form.get("publication-title").encode("utf8") or None,
@@ -102,3 +111,75 @@ def create_publication():
     flash("Publication creation failed!", "alert alert-danger")
     app.logger.debug("Failed to create the new publication.", exc_info=True)
     return redirect(url_for("publications.create_publication"))
+
+
+@pubbp.route("/edit/<int:publication_id>", methods=["GET", "POST"])
+@require_login
+def edit_publication(publication_id: int):
+    """Edit a publication's details."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        if request.method == "GET":
+            return render_template(
+                "publications/edit-publication.html",
+                publication=fetch_publication_by_id(conn, publication_id),
+                linked_phenotypes=tuple(fetch_publication_phenotypes(
+                    conn, publication_id)),
+                publication_id=publication_id)
+
+        form = request.form
+        _pub = update_publications(conn, ({
+            "publication_id": publication_id,
+            "pubmed_id": form.get("pubmed-id") or None,
+            "abstract": form.get("publication-abstract").encode("utf8") or None,
+            "authors": form.get("publication-authors").encode("utf8"),
+            "title":  form.get("publication-title").encode("utf8") or None,
+            "journal": form.get("publication-journal").encode("utf8") or None,
+            "volume": form.get("publication-volume").encode("utf8") or None,
+            "pages": form.get("publication-pages").encode("utf8") or None,
+            "month": (form.get("publication-month") or "").encode("utf8").capitalize() or None,
+            "year": form.get("publication-year").encode("utf8") or None
+        },))
+
+        if not _pub:
+            flash("There was an error updating the publication details.",
+                  "alert-danger")
+            return redirect(url_for(
+                "publications.edit_publication", publication_id=publication_id))
+
+        flash("Successfully updated the publication details.",
+              "alert-success")
+        return redirect_to_next({
+            "uri": "publications.view_publication",
+            "publication_id": publication_id
+        })
+
+
+@pubbp.route("/delete/<int:publication_id>", methods=["GET", "POST"])
+@require_login
+def delete_publication(publication_id: int):
+    """Delete a particular publication."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        publication = fetch_publication_by_id(conn, publication_id)
+        linked_phenotypes=tuple(fetch_publication_phenotypes(
+            conn, publication_id))
+
+        if not bool(publication):
+            flash("Requested publication was not found!", "alert-warning")
+            return redirect(url_for('publications.index'))
+
+        if len(linked_phenotypes) > 0:
+            flash("Cannot delete publication with linked phenotypes!",
+                  "alert-warning")
+            return redirect(url_for(
+                "publications.view_publication", publication_id=publication_id))
+
+        if request.method == "GET":
+            return render_template(
+                "publications/delete-publication.html",
+                publication=publication,
+                linked_phenotypes=linked_phenotypes,
+                publication_id=publication_id)
+
+        delete_publications(conn, (publication,))
+        flash("Deleted the publication successfully.", "alert-success")
+        return render_template("publications/delete-publication-success.html")