"""Functions for handling genotypes.""" from typing import Optional from datetime import datetime import MySQLdb as mdb from MySQLdb.cursors import Cursor, DictCursor from uploader.db_utils import debug_query def genocode_by_population( conn: mdb.Connection, population_id: int) -> tuple[dict, ...]: """Get the allele/genotype codes.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s", (population_id,)) return tuple(dict(item) for item in cursor.fetchall()) def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int: """Find the total count of the genotype markers for a species.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s", (species_id,)) return int(cursor.fetchone()["markers_count"]) def genotype_markers( conn: mdb.Connection, species_id: int, offset: int = 0, limit: Optional[int] = None ) -> tuple[dict, ...]: """Retrieve markers from the database.""" _query = "SELECT * FROM Geno WHERE SpeciesId=%s" if bool(limit) and limit > 0:# type: ignore[operator] _query = _query + f" LIMIT {limit} OFFSET {offset}" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, (species_id,)) debug_query(cursor) return tuple(dict(row) for row in cursor.fetchall()) def genotype_dataset( conn: mdb.Connection, species_id: int, population_id: int, dataset_id: Optional[int] = None ) -> Optional[dict]: """Retrieve genotype datasets from the database. Apparently, you should only ever have one genotype dataset for a population. """ _query = ( "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset " "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf " "ON iset.Id=gf.InbredSetId " "WHERE s.Id=%s AND iset.Id=%s") _params = (species_id, population_id) if bool(dataset_id): _query = _query + " AND gf.Id=%s" _params = _params + (dataset_id,)# type: ignore[assignment] with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute(_query, _params) debug_query(cursor) result = cursor.fetchone() if bool(result): return dict(result) return None def save_new_dataset( cursor: Cursor, population_id: int, name: str, fullname: str, shortname: str ) -> dict: """Save a new genotype dataset into the database.""" params = { "InbredSetId": population_id, "Name": name, "FullName": fullname, "ShortName": shortname, "CreateTime": datetime.now().date().isoformat(), "public": 2, "confidentiality": 0, "AuthorisedUsers": None } cursor.execute( "INSERT INTO GenoFreeze(" "Name, FullName, ShortName, CreateTime, public, InbredSetId, " "confidentiality, AuthorisedUsers" ") VALUES (" "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, " "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s" ")", params) return {**params, "Id": cursor.lastrowid}