diff options
Diffstat (limited to 'uploader/publications')
| -rw-r--r-- | uploader/publications/datatables.py | 52 | ||||
| -rw-r--r-- | uploader/publications/models.py | 39 | ||||
| -rw-r--r-- | uploader/publications/pubmed.py | 7 | ||||
| -rw-r--r-- | uploader/publications/views.py | 169 |
4 files changed, 240 insertions, 27 deletions
diff --git a/uploader/publications/datatables.py b/uploader/publications/datatables.py new file mode 100644 index 0000000..e07fafd --- /dev/null +++ b/uploader/publications/datatables.py @@ -0,0 +1,52 @@ +"""Fetch data for datatables.""" +import logging +from typing import Optional + +from MySQLdb.cursors import DictCursor + +from gn_libs.mysqldb import Connection, debug_query + +logger = logging.getLogger(__name__) + +def fetch_publications( + conn: Connection, + search: Optional[str] = None, + offset: int = 0, + limit: int = -1 +) -> tuple[dict, int, int, int]: + """Fetch publications from the database.""" + _query = "SELECT * FROM Publication" + _count_query = "SELECT COUNT(*) FROM Publication" + _params = None + _where_clause = "" + _limit_clause = "" + if search is not None and bool(search): + _where_clause = ("WHERE PubMed_ID LIKE %s " + "OR Authors LIKE %s " + "OR Title LIKE %s") + _params = (f"%{search}%",) * 3 + + if limit > 0: + _limit_clause = f"LIMIT {limit} OFFSET {offset}" + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT COUNT(*) FROM Publication") + _total_rows = int(cursor.fetchone()["COUNT(*)"]) + + cursor.execute(f"{_count_query} {_where_clause}", _params) + debug_query(cursor, logger) + _result = cursor.fetchone() + _total_filtered = int(_result["COUNT(*)"] if bool(_result) else 0) + + cursor.execute(f"{_query} {_where_clause} {_limit_clause}", _params) + debug_query(cursor, logger) + _current_filtered = tuple( + {**dict(row), "index": idx} + for idx, row + in enumerate(cursor.fetchall(), start=offset+1)) + + return ( + _current_filtered, + len(_current_filtered), + _total_filtered, + _total_rows) diff --git a/uploader/publications/models.py b/uploader/publications/models.py index 7d2862d..dcfa02b 100644 --- a/uploader/publications/models.py +++ b/uploader/publications/models.py @@ -30,6 +30,7 @@ def create_new_publications( conn: Connection, publications: tuple[dict, ...] ) -> tuple[dict, ...]: + """Create new publications in the database.""" if len(publications) > 0: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.executemany( @@ -42,17 +43,13 @@ def create_new_publications( "%(pubmed_id)s, %(abstract)s, %(authors)s, %(title)s, " "%(journal)s, %(volume)s, %(pages)s, %(month)s, %(year)s" ") " - "ON DUPLICATE KEY UPDATE " - "Abstract=VALUES(Abstract), Authors=VALUES(Authors), " - "Title=VALUES(Title), Journal=VALUES(Journal), " - "Volume=VALUES(Volume), Pages=VALUES(pages), " - "Month=VALUES(Month), Year=VALUES(Year) " "RETURNING *"), publications) return tuple({ - **row, "PublicationId": row["Id"] + **row, "publication_id": row["Id"] } for row in cursor.fetchall()) - return tuple() + + return tuple() def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tuple[dict, ...]: @@ -74,12 +71,25 @@ def update_publications(conn: Connection , publications: tuple[dict, ...]) -> tu return tuple() -def fetch_publications(conn: Connection) -> Iterable[dict]: - """Fetch publications from the database.""" - with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute("SELECT * FROM Publication") - for row in cursor.fetchall(): - yield dict(row) +def delete_publications(conn: Connection , publications: tuple[dict, ...]): + """Delete multiple publications""" + publications = tuple(pub for pub in publications if bool(pub)) + if len(publications) > 0: + _pub_ids = tuple(pub["Id"] for pub in publications) + _paramstr = ", ".join(["%s"] * len(_pub_ids)) + _phenos_query = ( + "SELECT PublicationId, COUNT(PhenotypeId) FROM PublishXRef " + f"WHERE PublicationId IN ({_paramstr}) GROUP BY PublicationId;") + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute(_phenos_query, _pub_ids) + _linked_phenos = cursor.fetchall() + if len(_linked_phenos) > 0: + raise Exception(# pylint: disable=[broad-exception-raised] + "Cannot delete publications with linked phenotypes.") + + cursor.execute( + f"DELETE FROM Publication WHERE Id IN ({_paramstr})", _pub_ids) def fetch_publication_by_id(conn: Connection, publication_id: int) -> dict: @@ -87,7 +97,8 @@ def fetch_publication_by_id(conn: Connection, publication_id: int) -> dict: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM Publication WHERE Id=%s", (publication_id,)) - return dict(cursor.fetchone()) + _res = cursor.fetchone() + return dict(_res) if _res else {} def fetch_publication_phenotypes( diff --git a/uploader/publications/pubmed.py b/uploader/publications/pubmed.py index ed9b652..2531c4a 100644 --- a/uploader/publications/pubmed.py +++ b/uploader/publications/pubmed.py @@ -29,9 +29,7 @@ def __journal__(journal: etree.Element) -> dict: } def __author__(author: etree.Element) -> str: - return "%s %s" % ( - author.find("LastName").text, - author.find("Initials").text) + return f'{author.find("LastName").text} {author.find("Initials").text}' def __pages__(pagination: etree.Element) -> str: @@ -88,7 +86,8 @@ def fetch_publications(pubmed_ids: tuple[int, ...]) -> tuple[dict, ...]: "db": "pubmed", "retmode": "xml", "id": ",".join(str(item) for item in pubmed_ids) - }) + }, + timeout=(9.13, 20)) if response.status_code == 200: return __process_pubmed_publication_data__(response.text) diff --git a/uploader/publications/views.py b/uploader/publications/views.py index 85d3aef..805d6f0 100644 --- a/uploader/publications/views.py +++ b/uploader/publications/views.py @@ -2,13 +2,26 @@ import json from gn_libs.mysqldb import database_connection -from flask import Blueprint, render_template, current_app as app +from flask import ( + flash, + request, + url_for, + redirect, + Blueprint, + render_template, + current_app as app) from uploader.authorisation import require_login +from uploader.route_utils import redirect_to_next -from .models import fetch_publications +from .models import ( + delete_publications, + update_publications, + fetch_publication_by_id, + create_new_publications, + fetch_publication_phenotypes) -from gn_libs.debug import __pk__ +from .datatables import fetch_publications pubbp = Blueprint("publications", __name__) @@ -17,18 +30,156 @@ pubbp = Blueprint("publications", __name__) @require_login def index(): """Index page for publications.""" - with database_connection(app.config["SQL_URI"]) as conn: - return render_template("publications/index.html") + return render_template("publications/index.html") @pubbp.route("/list", methods=["GET"]) @require_login def list_publications(): + """Fetch publications that fulfill a specific search, or all of them, if + there is no search term.""" + # request breakdown: + # https://datatables.net/manual/server-side + _page = int(request.args.get("draw")) + _length = int(request.args.get("length") or '-1') + _start = int(request.args.get("start") or '0') + _search = request.args["search[value]"] with database_connection(app.config["SQL_URI"]) as conn: + _publications, _current_rows, _totalfiltered, _totalrows = fetch_publications( + conn, + _search, + offset=_start, + limit=_length) + return json.dumps({ - "publications": tuple({ - **row, "index": idx - } for idx,row in enumerate( - fetch_publications(conn), start=1)), + "draw": _page, + "recordsTotal": _totalrows, + "recordsFiltered": _totalfiltered, + "publications": _publications, "status": "success" }) + + +@pubbp.route("/view/<int:publication_id>", methods=["GET"]) +@require_login +def view_publication(publication_id: int): + """View more details on a particular publication.""" + with database_connection(app.config["SQL_URI"]) as conn: + publication = fetch_publication_by_id(conn, publication_id) + + if not bool(publication): + flash("Requested publication was not found!", "alert-warning") + return redirect(url_for('publications.index')) + + return render_template( + "publications/view-publication.html", + publication=publication, + linked_phenotypes=tuple(fetch_publication_phenotypes( + conn, publication_id))) + + +@pubbp.route("/create", methods=["GET", "POST"]) +@require_login +def create_publication(): + """Create a new publication.""" + if request.method == "GET": + return render_template("publications/create-publication.html") + form = request.form + authors = form.get("publication-authors").encode("utf8") + if authors is None or authors == "": + flash("The publication's author(s) MUST be provided!", "alert alert-danger") + return redirect(url_for("publications.create", **request.args)) + + with database_connection(app.config["SQL_URI"]) as conn: + publications = create_new_publications(conn, ({ + "pubmed_id": form.get("pubmed-id") or None, + "abstract": form.get("publication-abstract").encode("utf8") or None, + "authors": authors, + "title": form.get("publication-title").encode("utf8") or None, + "journal": form.get("publication-journal").encode("utf8") or None, + "volume": form.get("publication-volume").encode("utf8") or None, + "pages": form.get("publication-pages").encode("utf8") or None, + "month": (form.get("publication-month") or "").encode("utf8").capitalize() or None, + "year": form.get("publication-year").encode("utf8") or None + },)) + flash("New publication created!", "alert alert-success") + return redirect(url_for( + request.args.get("return_to") or "publications.view_publication", + publication_id=publications[0]["publication_id"], + **request.args)) + + flash("Publication creation failed!", "alert alert-danger") + app.logger.debug("Failed to create the new publication.", exc_info=True) + return redirect(url_for("publications.create_publication")) + + +@pubbp.route("/edit/<int:publication_id>", methods=["GET", "POST"]) +@require_login +def edit_publication(publication_id: int): + """Edit a publication's details.""" + with database_connection(app.config["SQL_URI"]) as conn: + if request.method == "GET": + return render_template( + "publications/edit-publication.html", + publication=fetch_publication_by_id(conn, publication_id), + linked_phenotypes=tuple(fetch_publication_phenotypes( + conn, publication_id)), + publication_id=publication_id) + + form = request.form + _pub = update_publications(conn, ({ + "publication_id": publication_id, + "pubmed_id": form.get("pubmed-id") or None, + "abstract": form.get("publication-abstract").encode("utf8") or None, + "authors": form.get("publication-authors").encode("utf8"), + "title": form.get("publication-title").encode("utf8") or None, + "journal": form.get("publication-journal").encode("utf8") or None, + "volume": form.get("publication-volume").encode("utf8") or None, + "pages": form.get("publication-pages").encode("utf8") or None, + "month": (form.get("publication-month") or "").encode("utf8").capitalize() or None, + "year": form.get("publication-year").encode("utf8") or None + },)) + + if not _pub: + flash("There was an error updating the publication details.", + "alert-danger") + return redirect(url_for( + "publications.edit_publication", publication_id=publication_id)) + + flash("Successfully updated the publication details.", + "alert-success") + return redirect_to_next({ + "uri": "publications.view_publication", + "publication_id": publication_id + }) + + +@pubbp.route("/delete/<int:publication_id>", methods=["GET", "POST"]) +@require_login +def delete_publication(publication_id: int): + """Delete a particular publication.""" + with database_connection(app.config["SQL_URI"]) as conn: + publication = fetch_publication_by_id(conn, publication_id) + linked_phenotypes=tuple(fetch_publication_phenotypes( + conn, publication_id)) + + if not bool(publication): + flash("Requested publication was not found!", "alert-warning") + return redirect(url_for('publications.index')) + + if len(linked_phenotypes) > 0: + flash("Cannot delete publication with linked phenotypes!", + "alert-warning") + return redirect(url_for( + "publications.view_publication", publication_id=publication_id)) + + if request.method == "GET": + return render_template( + "publications/delete-publication.html", + publication=publication, + linked_phenotypes=linked_phenotypes, + publication_id=publication_id) + + delete_publications(conn, (publication,)) + flash("Deleted the publication successfully.", "alert-success") + return render_template("publications/delete-publication-success.html") |
