"""Database functions for species.""" import math from typing import Optional from functools import reduce import MySQLdb as mdb from MySQLdb.cursors import DictCursor def all_species(conn: mdb.Connection) -> tuple: "Retrieve the species from the database." with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT Id AS SpeciesId, SpeciesName, LOWER(Name) AS Name, " "MenuName, FullName, TaxonomyId, Family, FamilyOrderId, OrderId " "FROM Species ORDER BY FamilyOrderId ASC, OrderID ASC") return tuple(cursor.fetchall()) return tuple() def order_species_by_family(species: tuple[dict, ...]) -> list: """Order the species by their family""" def __family_order_id__(item): orderid = item["FamilyOrderId"] return math.inf if orderid is None else orderid def __order__(ordered, current): _key = (__family_order_id__(current), current["Family"]) return { **ordered, _key: ordered.get(_key, tuple()) + (current,) } ordered = reduce(__order__, species, {}) return sorted(tuple(ordered.items()), key=lambda item: item[0][0]) def species_by_id(conn: mdb.Connection, speciesid) -> dict: "Retrieve the species from the database by id." with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT Id AS SpeciesId, SpeciesName, LOWER(Name) AS Name, " "MenuName, FullName, TaxonomyId, Family, FamilyOrderId, OrderId " "FROM Species WHERE SpeciesId=%s", (speciesid,)) return cursor.fetchone() def save_species(conn: mdb.Connection, common_name: str, scientific_name: str, family: str, taxon_id: Optional[str] = None) -> int: """ Save a new species to the database. Parameters ---------- conn: A connection to the MariaDB database. taxon_id: The taxonomy identifier for the new species. common_name: The species' common name. scientific_name; The species' scientific name. """ genus, species = scientific_name.split(" ") families = species_families(conn) with conn.cursor() as cursor: cursor.execute("SELECT MAX(OrderId) FROM Species") species = { "common_name": common_name, "common_name_lower": common_name.lower(), "menu_name": f"{common_name} ({genus[0]}. {species.lower()})", "scientific_name": scientific_name, "family": family, "family_order": families[family], "taxon_id": taxon_id, "species_order": cursor.fetchone()[0] + 5 } cursor.execute( "INSERT INTO Species(" "SpeciesName, Name, MenuName, FullName, Family, FamilyOrderId, " "TaxonomyId, OrderId" ") VALUES (" "%(common_name)s, %(common_name_lower)s, %(menu_name)s, " "%(scientific_name)s, %(family)s, %(family_order)s, %(taxon_id)s, " "%(species_order)s" ")", species) species_id = cursor.lastrowid cursor.execute("UPDATE Species SET SpeciesId=%s WHERE Id=%s", (species_id, species_id)) return { **species, "species_id": species_id } def species_families(conn: mdb.Connection) -> dict: """Retrieve the families under which species are grouped.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT DISTINCT(Family), FamilyOrderId FROM Species " "WHERE Family IS NOT NULL") return { fam["Family"]: fam["FamilyOrderId"] for fam in cursor.fetchall() }