aboutsummaryrefslogtreecommitdiff
"""Functions for handling genotypes."""
from typing import Optional
from datetime import datetime

import MySQLdb as mdb
from MySQLdb.cursors import Cursor, DictCursor
from flask import current_app as app

from gn_libs.mysqldb import debug_query

def genocode_by_population(
        conn: mdb.Connection, population_id: int) -> tuple[dict, ...]:
    """Get the allele/genotype codes."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s",
                       (population_id,))
        return tuple(dict(item) for item in cursor.fetchall())


def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int:
    """Find the total count of the genotype markers for a species."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(
            "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s",
            (species_id,))
        return int(cursor.fetchone()["markers_count"])


def genotype_markers(
        conn: mdb.Connection,
        species_id: int,
        offset: int = 0,
        limit: Optional[int] = None
) -> tuple[dict, ...]:
    """Retrieve markers from the database."""
    _query = "SELECT * FROM Geno WHERE SpeciesId=%s"
    if bool(limit) and limit > 0:# type: ignore[operator]
        _query = _query + f" LIMIT {limit} OFFSET {offset}"

    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(_query, (species_id,))
        debug_query(cursor, app.logger)
        return tuple(dict(row) for row in cursor.fetchall())


def genotype_dataset(
        conn: mdb.Connection,
        species_id: int,
        population_id: int,
        dataset_id: Optional[int] = None
) -> Optional[dict]:
    """Retrieve genotype datasets from the database.

    Apparently, you should only ever have one genotype dataset for a population.
    """
    _query = (
        "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
        "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
        "ON iset.Id=gf.InbredSetId "
        "WHERE s.Id=%s AND iset.Id=%s")
    _params = (species_id, population_id)
    if bool(dataset_id):
        _query = _query + " AND gf.Id=%s"
        _params = _params + (dataset_id,)# type: ignore[assignment]

    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(_query, _params)
        debug_query(cursor, app.logger)
        result = cursor.fetchone()
        if bool(result):
            return dict(result)
        return None


def save_new_dataset(
        cursor: Cursor,
        population_id: int,
        name: str,
        fullname: str,
        shortname: str
) -> dict:
    """Save a new genotype dataset into the database."""
    params = {
        "InbredSetId": population_id,
        "Name": name,
        "FullName": fullname,
        "ShortName": shortname,
        "CreateTime": datetime.now().date().isoformat(),
        "public": 2,
        "confidentiality": 0,
        "AuthorisedUsers": None
    }
    cursor.execute(
        "INSERT INTO GenoFreeze("
        "Name, FullName, ShortName, CreateTime, public, InbredSetId, "
        "confidentiality, AuthorisedUsers"
        ") VALUES ("
        "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
        "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
        ")",
        params)
    return {**params, "Id": cursor.lastrowid}