diff options
Diffstat (limited to 'uploader/species')
-rw-r--r-- | uploader/species/__init__.py | 2 | ||||
-rw-r--r-- | uploader/species/models.py | 152 | ||||
-rw-r--r-- | uploader/species/views.py | 200 |
3 files changed, 354 insertions, 0 deletions
diff --git a/uploader/species/__init__.py b/uploader/species/__init__.py new file mode 100644 index 0000000..83f2165 --- /dev/null +++ b/uploader/species/__init__.py @@ -0,0 +1,2 @@ +"""Package to handle creation and management of species.""" +from .views import speciesbp diff --git a/uploader/species/models.py b/uploader/species/models.py new file mode 100644 index 0000000..51f941c --- /dev/null +++ b/uploader/species/models.py @@ -0,0 +1,152 @@ +"""Database functions for species.""" +import math +from typing import Optional +from functools import reduce + +import MySQLdb as mdb +from MySQLdb.cursors import DictCursor + +def all_species(conn: mdb.Connection) -> tuple: + "Retrieve the species from the database." + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT Id AS SpeciesId, SpeciesName, LOWER(Name) AS Name, " + "MenuName, FullName, TaxonomyId, Family, FamilyOrderId, OrderId " + "FROM Species ORDER BY FamilyOrderId ASC, OrderID ASC") + return tuple(cursor.fetchall()) + + return tuple() + +def order_species_by_family(species: tuple[dict, ...]) -> list: + """Order the species by their family""" + def __family_order_id__(item): + orderid = item["FamilyOrderId"] + return math.inf if orderid is None else orderid + def __order__(ordered, current): + _key = (__family_order_id__(current), current["Family"]) + return { + **ordered, + _key: ordered.get(_key, tuple()) + (current,) + } + ordered = reduce(__order__, species, {})# type: ignore[var-annotated] + return sorted(tuple(ordered.items()), key=lambda item: item[0][0]) + + +def species_by_id(conn: mdb.Connection, speciesid) -> dict: + "Retrieve the species from the database by id." + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT Id AS SpeciesId, SpeciesName, LOWER(Name) AS Name, " + "MenuName, FullName, TaxonomyId, Family, FamilyOrderId, OrderId " + "FROM Species WHERE SpeciesId=%s", + (speciesid,)) + return cursor.fetchone() + + +def save_species(conn: mdb.Connection, + common_name: str, + scientific_name: str, + family: str, + taxon_id: Optional[str] = None) -> dict: + """ + Save a new species to the database. + + Parameters + ---------- + conn: A connection to the MariaDB database. + taxon_id: The taxonomy identifier for the new species. + common_name: The species' common name. + scientific_name; The species' scientific name. + """ + genus, species_name = scientific_name.split(" ") + families = species_families(conn) + with conn.cursor() as cursor: + cursor.execute("SELECT MAX(OrderId) FROM Species") + species = { + "common_name": common_name, + "common_name_lower": common_name.lower(), + "menu_name": f"{common_name} ({genus[0]}. {species_name.lower()})", + "scientific_name": scientific_name, + "family": family, + "family_order": families[family], + "taxon_id": taxon_id, + "species_order": cursor.fetchone()[0] + 5 + } + cursor.execute( + "INSERT INTO Species(" + "SpeciesName, Name, MenuName, FullName, Family, FamilyOrderId, " + "TaxonomyId, OrderId" + ") VALUES (" + "%(common_name)s, %(common_name_lower)s, %(menu_name)s, " + "%(scientific_name)s, %(family)s, %(family_order)s, %(taxon_id)s, " + "%(species_order)s" + ")", + species) + species_id = cursor.lastrowid + cursor.execute("UPDATE Species SET SpeciesId=%s WHERE Id=%s", + (species_id, species_id)) + return { + **species, + "species_id": species_id + } + + +def update_species(# pylint: disable=[too-many-arguments] + conn: mdb.Connection, + species_id: int, + common_name: str, + scientific_name: str, + family: str, + family_order: int, + species_order: int +): + """Update a species' details. + + Parameters + ---------- + conn: A connection to the MariaDB database. + species_id: The species identifier + + Key-Word Arguments + ------------------ + common_name: A layman's name for the species + scientific_name: A binomial nomenclature name for the species + family: The grouping under which the species falls + family_order: The ordering for the "family" above + species_order: The ordering of this species in relation to others + """ + with conn.cursor(cursorclass=DictCursor) as cursor: + genus, species_name = scientific_name.split(" ") + species = { + "species_id": species_id, + "common_name": common_name, + "common_name_lower": common_name.lower(), + "menu_name": f"{common_name} ({genus[0]}. {species_name.lower()})", + "scientific_name": scientific_name, + "family": family, + "family_order": family_order, + "species_order": species_order + } + cursor.execute( + "UPDATE Species SET " + "SpeciesName=%(common_name)s, " + "Name=%(common_name_lower)s, " + "MenuName=%(menu_name)s, " + "FullName=%(scientific_name)s, " + "Family=%(family)s, " + "FamilyOrderId=%(family_order)s, " + "OrderId=%(species_order)s " + "WHERE Id=%(species_id)s", + species) + + +def species_families(conn: mdb.Connection) -> dict: + """Retrieve the families under which species are grouped.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT DISTINCT(Family), FamilyOrderId FROM Species " + "WHERE Family IS NOT NULL") + return { + fam["Family"]: fam["FamilyOrderId"] + for fam in cursor.fetchall() + } diff --git a/uploader/species/views.py b/uploader/species/views.py new file mode 100644 index 0000000..10715a5 --- /dev/null +++ b/uploader/species/views.py @@ -0,0 +1,200 @@ +"""Endpoints handling species.""" +from pymonad.either import Left, Right, Either +from flask import (flash, + request, + url_for, + redirect, + Blueprint, + current_app as app) + +from uploader.population import popbp +from uploader.platforms import platformsbp +from uploader.ui import make_template_renderer +from uploader.db_utils import database_connection +from uploader.oauth2.client import oauth2_get, oauth2_post +from uploader.authorisation import require_login, require_token +from uploader.datautils import order_by_family, enumerate_sequence + +from .models import (all_species, + save_species, + species_by_id, + update_species, + species_families) + + +speciesbp = Blueprint("species", __name__) +speciesbp.register_blueprint(popbp, url_prefix="/") +speciesbp.register_blueprint(platformsbp, url_prefix="/") +render_template = make_template_renderer("species") + + +@speciesbp.route("/", methods=["GET"]) +@require_login +def list_species(): + """List and display all the species in the database.""" + with database_connection(app.config["SQL_URI"]) as conn: + return render_template("species/list-species.html", + allspecies=enumerate_sequence(all_species(conn))) + +@speciesbp.route("/<int:species_id>", methods=["GET"]) +@require_login +def view_species(species_id: int): + """View details of a particular species and menus to act upon it.""" + with database_connection(app.config["SQL_URI"]) as conn: + species = species_by_id(conn, species_id) + if bool(species): + return render_template("species/view-species.html", + species=species, + activelink="view-species") + flash("Could not find a species with the given identifier.", + "alert-danger") + return redirect(url_for("species.view_species")) + +@speciesbp.route("/create", methods=["GET", "POST"]) +@require_login +def create_species(): + """Create a new species.""" + # We can use uniprot's API to fetch the details with something like + # https://rest.uniprot.org/taxonomy/<taxonID> e.g. + # https://rest.uniprot.org/taxonomy/6239 + with (database_connection(app.config["SQL_URI"]) as conn, + conn.cursor() as cursor): + if request.method == "GET": + return render_template("species/create-species.html", + families=species_families(conn), + activelink="create-species") + + error = False + taxon_id = request.form.get("species_taxonomy_id", "").strip() or None + + common_name = request.form.get("common_name", "").strip() + if not bool(common_name): + flash("The common species name MUST be provided.", "alert-danger") + error = True + + scientific_name = request.form.get("scientific_name", "").strip() + if not bool(scientific_name): + flash("The species' scientific name MUST be provided.", + "alert-danger") + error = True + + parts = tuple(name.strip() for name in scientific_name.split(" ")) + if len(parts) != 2 or not all(bool(name) for name in parts): + flash("The scientific name you provided is invalid.", "alert-danger") + error = True + + cursor.execute( + "SELECT * FROM Species WHERE FullName=%s", (scientific_name,)) + res = cursor.fetchone() + if bool(res): + flash("A species already exists with the provided scientific name.", + "alert-danger") + error = True + + family = request.form.get("species_family", "").strip() + if not bool(family): + flash("The species' family MUST be selected.", "alert-danger") + error = True + + if bool(taxon_id): + cursor.execute( + "SELECT * FROM Species WHERE TaxonomyId=%s", (taxon_id,)) + res = cursor.fetchone() + if bool(res): + flash("A species already exists with the provided scientific name.", + "alert-danger") + error = True + + if error: + return redirect(url_for("species.create_species", + common_name=common_name, + scientific_name=scientific_name, + taxon_id=taxon_id)) + + species = save_species( + conn, common_name, scientific_name, family, taxon_id) + flash("Species saved successfully!", "alert-success") + return redirect(url_for("species.view_species", species_id=species["species_id"])) + + +@speciesbp.route("/<int:species_id>/edit-extra", methods=["GET", "POST"]) +@require_login +@require_token +#def edit_species(species_id: int): +def edit_species_extra(token: dict, species_id: int):# pylint: disable=[unused-argument] + """Edit a species' details. + + Parameters + ---------- + token: A JWT token used for authorisation. + species_id: An identifier for the species being edited. + """ + def __failure__(res): + app.logger.debug( + "There was an error in the attempt to edit the species: %s", res) + flash(res, "alert-danger") + return redirect(url_for("species.view_species", species_id=species_id)) + + def __system_resource_uuid__(resources) -> Either: + sys_res = [ + resource for resource in resources + if resource["resource_category"]["resource_category_key"] == "system" + ] + if len(sys_res) != 1: + return Left("Could not find/identify a valid system resource.") + return Right(sys_res[0]["resource_id"]) + + def __check_privileges__(authorisations): + if len(authorisations.items()) != 1: + return Left("Got authorisations for more than a single resource!") + + auths = tuple(authorisations.items())[0][1] + authorised = "system:species:edit-extra-info" in tuple( + privilege["privilege_id"] + for role in auths["roles"] + for privilege in role["privileges"]) + if authorised: + return Right(authorised) + return Left("You are not authorised to edit species extra details.") + + with database_connection(app.config["SQL_URI"]) as conn: + species = species_by_id(conn, species_id) + all_the_species = all_species(conn) + families = species_families(conn) + family_order = tuple( + item[0] for item in order_by_family(all_the_species) + if item[0][1] is not None) + if bool(species) and request.method == "GET": + return oauth2_get("auth/user/resources").then( + __system_resource_uuid__ + ).then( + lambda resource_id: oauth2_post( + "auth/resource/authorisation", + json={"resource-ids": [resource_id]}) + ).then(__check_privileges__).then( + lambda authorisations: render_template( + "species/edit-species.html", + species=species, + families=families, + family_order=family_order, + max_order_id = max( + row["OrderId"] for row in all_the_species + if row["OrderId"] is not None), + activelink="edit-species") + ).either(__failure__, lambda res: res) + + if bool(species) and request.method == "POST": + update_species(conn, + species_id, + request.form["species_name"], + request.form["species_fullname"], + request.form["species_family"], + int(request.form["species_familyorderid"]), + int(request.form["species_orderid"])) + flash("Updated species successfully.", "alert-success") + return redirect(url_for("species.edit_species_extra", + species_id=species_id)) + + flash("Species with the given identifier was not found!", + "alert-danger") + return redirect(url_for("species.list_species")) |