aboutsummaryrefslogtreecommitdiff
"""Functions for accessing the database relating to species populations."""
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor

def population_by_id(conn: mdb.Connection, population_id) -> dict:
    """Get the grouping/population by id."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute("SELECT * FROM InbredSet WHERE InbredSetId=%s",
                       (population_id,))
        return cursor.fetchone()

def population_by_species_and_id(
        conn: mdb.Connection, species_id, population_id) -> dict:
    """Retrieve a population by its identifier and species."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute("SELECT * FROM InbredSet WHERE SpeciesId=%s AND Id=%s",
                       (species_id, population_id))
        return cursor.fetchone()

def populations_by_species(conn: mdb.Connection, speciesid) -> tuple:
    "Retrieve group (InbredSet) information from the database."
    with conn.cursor(cursorclass=DictCursor) as cursor:
        query = "SELECT * FROM InbredSet WHERE SpeciesId=%s"
        cursor.execute(query, (speciesid,))
        return tuple(cursor.fetchall())

    return tuple()

__GENERIC_POPULATION_FAMILIES__ = (
    "Reference Populations (replicate average, SE, N)",
    "Crosses and Heterogeneous Stock (individuals)",
    "Groups Without Genotypes")

def population_families(conn, species_id: int) -> tuple[str]:
    """Fetch the families under which populations are grouped."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        paramstr = ", ".join(["%s"] * len(__GENERIC_POPULATION_FAMILIES__))
        cursor.execute(
            "SELECT DISTINCT(Family) FROM InbredSet "
            "WHERE SpeciesId=%s "
            "AND Family IS NOT NULL "
            f"AND Family NOT IN ({paramstr})",
            (species_id, *__GENERIC_POPULATION_FAMILIES__))
        return __GENERIC_POPULATION_FAMILIES__ + tuple(
            row["Family"] for row in cursor.fetchall())


def population_genetic_types(conn) -> tuple:
    """Fetch the families under which populations are grouped."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(
            "SELECT DISTINCT(GeneticType) FROM InbredSet WHERE GeneticType IS "
            "NOT NULL")
        return tuple(row["GeneticType"] for row in cursor.fetchall())


def save_population(cursor: mdb.cursors.Cursor, population_details: dict) -> dict:
    """Save the population details to the db."""
    cursor.execute("SELECT DISTINCT(Family), FamilyOrder FROM InbredSet "
                   "WHERE SpeciesId=%s "
                   "AND Family IS NOT NULL AND Family != '' "
                   "AND FamilyOrder IS NOT NULL "
                   "ORDER BY FamilyOrder ASC",
                   (population_details["SpeciesId"],))
    _families = {
        row["Family"]: int(row["FamilyOrder"])
        for row in cursor.fetchall()
    }
    params = {
        "MenuOrderId": 0,
        "InbredSetId": 0,
        "public": 2,
        **population_details,
        "FamilyOrder": _families.get(
            population_details["Family"],
            max((0,) + tuple(_families.values()))+1)
    }
    cursor.execute(
        "INSERT INTO InbredSet("
        "InbredSetId, InbredSetName, Name, SpeciesId, FullName, "
        "public, MappingMethodId, GeneticType, Family, FamilyOrder,"
        " MenuOrderId, InbredSetCode, Description"
        ") "
        "VALUES ("
        "%(InbredSetId)s, %(InbredSetName)s, %(Name)s, %(SpeciesId)s, "
        "%(FullName)s, %(public)s, %(MappingMethodId)s, %(GeneticType)s, "
        "%(Family)s, %(FamilyOrder)s, %(MenuOrderId)s, %(InbredSetCode)s, "
        "%(Description)s"
        ")",
        params)
    new_id = cursor.lastrowid
    cursor.execute("UPDATE InbredSet SET InbredSetId=%s WHERE Id=%s",
                   (new_id, new_id))
    return {
        **params,
        "Id": new_id,
        "InbredSetId": new_id,
        "population_id": new_id
    }