1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
"""Functions for handling genotypes."""
from typing import Optional
from datetime import datetime
import MySQLdb as mdb
from MySQLdb.cursors import Cursor, DictCursor
from uploader.db_utils import debug_query
def genocode_by_population(
conn: mdb.Connection, population_id: int) -> tuple[dict, ...]:
"""Get the allele/genotype codes."""
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute("SELECT * FROM GenoCode WHERE InbredSetId=%s",
(population_id,))
return tuple(dict(item) for item in cursor.fetchall())
def genotype_markers_count(conn: mdb.Connection, species_id: int) -> int:
"""Find the total count of the genotype markers for a species."""
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
"SELECT COUNT(Name) AS markers_count FROM Geno WHERE SpeciesId=%s",
(species_id,))
return int(cursor.fetchone()["markers_count"])
def genotype_markers(
conn: mdb.Connection,
species_id: int,
offset: int = 0,
limit: Optional[int] = None
) -> tuple[dict, ...]:
"""Retrieve markers from the database."""
_query = "SELECT * FROM Geno WHERE SpeciesId=%s"
if bool(limit) and limit > 0:# type: ignore[operator]
_query = _query + f" LIMIT {limit} OFFSET {offset}"
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(_query, (species_id,))
debug_query(cursor)
return tuple(dict(row) for row in cursor.fetchall())
def genotype_dataset(
conn: mdb.Connection,
species_id: int,
population_id: int,
dataset_id: Optional[int] = None
) -> Optional[dict]:
"""Retrieve genotype datasets from the database.
Apparently, you should only ever have one genotype dataset for a population.
"""
_query = (
"SELECT gf.* FROM Species AS s INNER JOIN InbredSet AS iset "
"ON s.Id=iset.SpeciesId INNER JOIN GenoFreeze AS gf "
"ON iset.Id=gf.InbredSetId "
"WHERE s.Id=%s AND iset.Id=%s")
_params = (species_id, population_id)
if bool(dataset_id):
_query = _query + " AND gf.Id=%s"
_params = _params + (dataset_id,)# type: ignore[assignment]
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(_query, _params)
debug_query(cursor)
result = cursor.fetchone()
if bool(result):
return dict(result)
return None
def save_new_dataset(
cursor: Cursor,
population_id: int,
name: str,
fullname: str,
shortname: str
) -> dict:
"""Save a new genotype dataset into the database."""
params = {
"InbredSetId": population_id,
"Name": name,
"FullName": fullname,
"ShortName": shortname,
"CreateTime": datetime.now().date().isoformat(),
"public": 2,
"confidentiality": 0,
"AuthorisedUsers": None
}
cursor.execute(
"INSERT INTO GenoFreeze("
"Name, FullName, ShortName, CreateTime, public, InbredSetId, "
"confidentiality, AuthorisedUsers"
") VALUES ("
"%(Name)s, %(FullName)s, %(ShortName)s, %(CreateTime)s, %(public)s, "
"%(InbredSetId)s, %(confidentiality)s, %(AuthorisedUsers)s"
")",
params)
return {**params, "Id": cursor.lastrowid}
|