aboutsummaryrefslogtreecommitdiff
path: root/uploader/phenotypes/models.py
blob: c3b6dfb385b66e30ddd37e94f8f94ea1fccc3941 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Database and utility functions for phenotypes."""
from typing import Optional

import MySQLdb as mdb
from MySQLdb.cursors import DictCursor

from uploader.db_utils import debug_query

def datasets_by_population(
        conn: mdb.Connection,
        species_id: int,
        population_id: int
) -> tuple[dict, ...]:
    """Retrieve all of a population's phenotype studies."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(
            "SELECT s.SpeciesId, pf.* FROM Species AS s "
            "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId "
            "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId "
            "WHERE s.Id=%s AND iset.Id=%s;",
                       (species_id, population_id))
        return tuple(dict(row) for row in cursor.fetchall())


def dataset_by_id(conn: mdb.Connection,
                  species_id: int,
                  population_id: int,
                  dataset_id: int) -> dict:
    """Fetch dataset details by identifier"""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(
            "SELECT s.SpeciesId, pf.* FROM Species AS s "
            "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId "
            "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId "
            "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s",
            (species_id, population_id, dataset_id))
        return dict(cursor.fetchone())


def phenotypes_count(conn: mdb.Connection,
                     population_id: int,
                     dataset_id: int) -> int:
    """Count the number of phenotypes in the dataset."""
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(
            "SELECT COUNT(*) AS total_phenos FROM Phenotype AS pheno "
            "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
            "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
            "WHERE pxr.InbredSetId=%s AND pf.Id=%s",
        (population_id, dataset_id))
        return int(cursor.fetchone()["total_phenos"])


def dataset_phenotypes(conn: mdb.Connection,
                       population_id: int,
                       dataset_id: int,
                       offset: int = 0,
                       limit: Optional[int] = None) -> tuple[dict, ...]:
    """Fetch the actual phenotypes."""
    _query = (
        "SELECT pheno.*, pxr.Id, ist.InbredSetCode FROM Phenotype AS pheno "
        "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
        "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
        "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id "
        "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + (
            f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(_query, (population_id, dataset_id))
        debug_query(cursor)
        return tuple(dict(row) for row in cursor.fetchall())


def phenotypes_data(conn: mdb.Connection,
                    population_id: int,
                    dataset_id: int,
                    offset: int = 0,
                    limit: Optional[int] = None) -> tuple[dict, ...]:
    """Fetch the data for the phenotypes."""
    #TODO: This query isn't exactly right, it misses some data.
    # — Phenotype -> PublishXRef -> PublishData -> Strain -> PublishFreeze
    _query = ("SELECT pxr.*, pd.*, str.* FROM PublishFreeze AS pf "
              "INNER JOIN PublishXRef AS pxr ON pf.InbredSetId=pxr.InbredSetId "
              "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id "
              "INNER JOIN Strain AS str ON pd.StrainId=str.Id "
              "WHERE pf.InbredSetId=%s AND pf.Id=%s "
              "ORDER BY pxr.DataId ASC, str.Id ASC") + (
                  f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
    with conn.cursor(cursorclass=DictCursor) as cursor:
        cursor.execute(_query, (population_id, dataset_id))
        debug_query(cursor)
        return tuple(dict(row) for row in cursor.fetchall())


def phenotypes_se(conn: mdb.Connection, dataset_id: int) -> tuple[dict, ...]:
    """Fetch the standard errors for the phenotypes."""
    return tuple()


def phenotypes_sample_counts(conn: mdb.Connection, dataset_id: int) -> tuple[dict, ...]:
    """Fetch the standard errors for the phenotypes."""
    return tuple()