aboutsummaryrefslogtreecommitdiff
path: root/uploader/genotypes/models.py
blob: 44c98b12ac43de573399c4d910c0e3363f099a9f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Functions for handling genotypes."""
from typing import Optional
from datetime import datetime

import MySQLdb as mdb
from MySQLdb.cursors import Cursor, DictCursor

from uploader.db_utils import debug_query

def genocode_by_population(
        conn: mdb.Connection, population_id: int) -> tuple[dict, ...]:
    """Get the allele/genotype codes."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s",
                       (population_id,))
        return tuple(dict(item) for item in cursor.fetchall())


def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int:
    """Find the total count of the genotype markers for a species."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(
            "SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s",
            (species_id,))
        return int(cursor.fetchone()["markers_count"])


def genotype_markers(
        conn: mdb.Connection,
        species_id: int,
        offset: int = 0,
        limit: Optional[int] = None
) -> tuple[dict, ...]:
    """Retrieve markers from the database."""
    _query = "SELECT * FROM Geno WHERE SpeciesId=%s"
    if bool(limit) and limit > 0:# type: ignore[operator]
        _query = _query + f" LIMIT {limit} OFFSET {offset}"

    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(_query, (species_id,))
        debug_query(cursor)
        return tuple(dict(row) for row in cursor.fetchall())


def genotype_dataset(
        conn: mdb.Connection,
        species_id: int,
        population_id: int,
        dataset_id: Optional[int] = None
) -> Optional[dict]:
    """Retrieve genotype datasets from the database.

    Apparently, you should only ever have one genotype dataset for a population.
    """
    _query = (
        "SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
        "ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
        "ON iset.Id=gf.InbredSetId "
        "WHERE s.Id=%s AND iset.Id=%s")
    _params = (species_id, population_id)
    if bool(dataset_id):
        _query = _query + " AND gf.Id=%s"
        _params = _params + (dataset_id,)# type: ignore[assignment]

    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(_query, _params)
        debug_query(cursor)
        result = cursor.fetchone()
        if bool(result):
            return dict(result)
        return None


def save_new_dataset(
        cursor: Cursor,
        population_id: int,
        name: str,
        fullname: str,
        shortname: str
) -> dict:
    """Save a new genotype dataset into the database."""
    params = {
        "InbredSetId": population_id,
        "Name": name,
        "FullName": fullname,
        "ShortName": shortname,
        "CreateTime": datetime.now().date().isoformat(),
        "public": 2,
        "confidentiality": 0,
        "AuthorisedUsers": None
    }
    cursor.execute(
        "INSERT INTO GenoFreeze("
        "Name, FullName, ShortName, CreateTime, public, InbredSetId, "
        "confidentiality, AuthorisedUsers"
        ") VALUES ("
        "%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
        "%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
        ")",
        params)
    return {**params, "Id": cursor.lastrowid}