"""Endpoints handling species."""
from markupsafe import escape
from pymonad.either import Left, Right, Either
from gn_libs.mysqldb import database_connection
from flask import (flash,
request,
redirect,
Blueprint,
current_app as app)
from uploader.population import popbp
from uploader.platforms import platformsbp
from uploader.flask_extensions import url_for
from uploader.ui import make_template_renderer
from uploader.oauth2.client import oauth2_get, oauth2_post
from uploader.authorisation import require_login, require_token
from uploader.datautils import order_by_family, enumerate_sequence
from uploader.population.models import (populations_by_species,
population_by_species_and_id)
from .models import (all_species,
save_species,
species_by_id,
update_species,
species_families)
speciesbp = Blueprint("species", __name__)
speciesbp.register_blueprint(popbp, url_prefix="/")
speciesbp.register_blueprint(platformsbp, url_prefix="/")
render_template = make_template_renderer("species")
@speciesbp.route("/", methods=["GET"])
@require_login
def list_species():
"""List and display all the species in the database."""
with database_connection(app.config["SQL_URI"]) as conn:
return render_template("species/list-species.html",
allspecies=enumerate_sequence(all_species(conn)))
@speciesbp.route("/<int:species_id>", methods=["GET"])
@require_login
def view_species(species_id: int):
"""View details of a particular species and menus to act upon it."""
streamlined_ui = request.args.get("streamlined_ui")
with database_connection(app.config["SQL_URI"]) as conn:
species = species_by_id(conn, species_id)
if bool(species):
population = population_by_species_and_id(
conn, species_id, request.args.get("population_id"))
if bool(population):
return redirect(url_for("species.populations.view_population",
species_id=species_id,
population_id=population["Id"]))
return render_template(
"species/view-species.html",
species=species,
activelink="view-species",
populations=populations_by_species(conn, species["SpeciesId"]))
flash("Could not find a species with the given identifier.",
"alert-danger")
return redirect(url_for("base.index"
if streamlined_ui
else "species.view_species"))
@speciesbp.route("/create", methods=["GET", "POST"])
@require_login
def create_species():
"""Create a new species."""
# We can use uniprot's API to fetch the details with something like
# https://rest.uniprot.org/taxonomy/<taxonID> e.g.
# https://rest.uniprot.org/taxonomy/6239
with (database_connection(app.config["SQL_URI"]) as conn,
conn.cursor() as cursor):
if request.method == "GET":
return render_template("species/create-species.html",
families=species_families(conn),
return_to=(
request.args.get("return_to") or ""),
activelink="create-species")
error = False
taxon_id = request.form.get("species_taxonomy_id", "").strip() or None
common_name = request.form.get("common_name", "").strip()
if not bool(common_name):
flash("The common species name MUST be provided.", "alert-danger")
error = True
scientific_name = request.form.get("scientific_name", "").strip()
if not bool(scientific_name):
flash("The species' scientific name MUST be provided.",
"alert-danger")
error = True
parts = tuple(name.strip() for name in scientific_name.split(" "))
if (len(parts) != 2 and len(parts) != 3) or not all(bool(name) for name in parts):
flash("The scientific name you provided is invalid.", "alert-danger")
error = True
cursor.execute(
"SELECT * FROM Species WHERE FullName=%s", (scientific_name,))
res = cursor.fetchone()
if bool(res):
flash("A species already exists with the provided scientific name.",
"alert-danger")
error = True
family = request.form.get("species_family", "").strip()
if not bool(family):
flash("The species' family MUST be selected.", "alert-danger")
error = True
if bool(taxon_id):
cursor.execute(
"SELECT * FROM Species WHERE TaxonomyId=%s", (taxon_id,))
res = cursor.fetchone()
if bool(res):
flash("A species already exists with the provided scientific name.",
"alert-danger")
error = True
if error:
return redirect(url_for("species.create_species",
common_name=common_name,
scientific_name=scientific_name,
taxon_id=taxon_id))
species = save_species(
conn, common_name, scientific_name, family, taxon_id)
flash(
f"You have successfully added species "
f"'{escape(species['scientific_name'])} "
f"({escape(species['common_name'])})'.",
"alert-success")
return_to = request.form.get("return_to").strip()
if return_to:
return redirect(url_for(return_to, species_id=species["species_id"]))
return redirect(url_for("species.view_species", species_id=species["species_id"]))
@speciesbp.route("/<int:species_id>/edit-extra", methods=["GET", "POST"])
@require_login
@require_token
#def edit_species(species_id: int):
def edit_species_extra(token: dict, species_id: int):# pylint: disable=[unused-argument]
"""Edit a species' details.
Parameters
----------
token: A JWT token used for authorisation.
species_id: An identifier for the species being edited.
"""
def __failure__(res):
app.logger.debug(
"There was an error in the attempt to edit the species: %s", res)
flash(res, "alert-danger")
return redirect(url_for("species.view_species", species_id=species_id))
def __system_resource_uuid__(resources) -> Either:
sys_res = [
resource for resource in resources
if resource["resource_category"]["resource_category_key"] == "system"
]
if len(sys_res) != 1:
return Left("Could not find/identify a valid system resource.")
return Right(sys_res[0]["resource_id"])
def __check_privileges__(authorisations):
if len(authorisations.items()) != 1:
return Left("Got authorisations for more than a single resource!")
auths = tuple(authorisations.items())[0][1]
authorised = "system:species:edit-extra-info" in tuple(
privilege["privilege_id"]
for role in auths["roles"]
for privilege in role["privileges"])
if authorised:
return Right(authorised)
return Left("You are not authorised to edit species extra details.")
with database_connection(app.config["SQL_URI"]) as conn:
species = species_by_id(conn, species_id)
all_the_species = all_species(conn)
families = species_families(conn)
family_order = tuple(
item[0] for item in order_by_family(all_the_species)
if item[0][1] is not None)
if bool(species) and request.method == "GET":
return oauth2_get("auth/user/resources").then(
__system_resource_uuid__
).then(
lambda resource_id: oauth2_post(
"auth/resource/authorisation",
json={"resource-ids": [resource_id]})
).then(__check_privileges__).then(
lambda authorisations: render_template(
"species/edit-species.html",
species=species,
families=families,
family_order=family_order,
max_order_id = max(
row["OrderId"] for row in all_the_species
if row["OrderId"] is not None),
activelink="edit-species")
).either(__failure__, lambda res: res)
if bool(species) and request.method == "POST":
update_species(conn,
species_id,
request.form["species_name"],
request.form["species_fullname"],
request.form["species_family"],
int(request.form["species_familyorderid"]),
int(request.form["species_orderid"]))
flash("Updated species successfully.", "alert-success")
return redirect(url_for("species.edit_species_extra",
species_id=species_id))
flash("Species with the given identifier was not found!",
"alert-danger")
return redirect(url_for("species.list_species"))