aboutsummaryrefslogtreecommitdiff
path: root/gn3/db/traits.py
blob: 3134d312d730728901daa311c0958a259990b23d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""This contains all the necessary functions that are required to add traits
to the published database"""
from dataclasses import dataclass
from itertools import chain
from typing import Optional
from typing import Any, Dict, List, Optional


@dataclass(frozen=True)
class riset:
    """Class for keeping track of riset. A riset is a group e.g. rat HSNIH-Palmer,
BXD

    """
    name: str
    r_id: int


@dataclass(frozen=True)
class webqtlCaseData:
    """Class for keeping track of one case data in one trait"""
    value: Optional[float] = None
    variance: Optional[float] = None
    count: Optional[int] = None  # Number of Individuals

    def __str__(self):
        _str = ""
        if self.value:
            _str += f"value={self.value:.3f}"
        if self.variance:
            _str += f" variance={self.variance:.3f}"
        if self.count:
            _str += " n_data={self.count}"
        return _str


def get_riset(data_type: str, name: str, conn: Any):
    query, _name, _id = None, None, None
    if data_type == "Publish":
        query = ("SELECT InbredSet.Name, InbredSet.Id FROM InbredSet, "
                 "PublishFreeze WHERE PublishFreeze.InbredSetId = "
                 "InbredSet.Id AND PublishFreeze.Name = '%s'" % name)
    elif data_type == "Geno":
        query = ("SELECT InbredSet.Name, InbredSet.Id FROM InbredSet, "
                 "GenoFreeze WHERE GenoFreeze.InbredSetId = "
                 "InbredSet.Id AND GenoFreeze.Name = '%s'" % name)
    elif data_type == "ProbeSet":
        query = ("SELECT InbredSet.Name, InbredSet.Id FROM "
                 "InbredSet, ProbeSetFreeze, ProbeFreeze WHERE "
                 "ProbeFreeze.InbredSetId = InbredSet.Id AND "
                 "ProbeFreeze.Id = ProbeSetFreeze.ProbeFreezeId AND "
                 "ProbeSetFreeze.Name = '%s'" % name)
    if query:
        with conn.cursor() as cursor:
            _name, _id = cursor.fetchone()
            if _name == "BXD300":
                _name = "BXD"
    return riset(_name, _id)


def insert_publication(pubmed_id: int, publication: Optional[Dict],
                       conn: Any):
    """Creates a new publication record if it's not available"""
    sql = ("SELECT Id FROM Publication where "
           "PubMed_ID = %d" % pubmed_id)
    _id = None
    with conn.cursor() as cursor:
        cursor.execute(sql)
        _id = cursor.fetchone()
    if not _id:
        # The Publication contains the fields: 'authors', 'title', 'abstract',
        # 'journal','volume','pages','month','year'
        insert_query = ("INSERT into Publication (%s) Values (%s)" %
                        (", ".join(publication.keys()),
                         ", ".join(['%s'] * len(publication))))
        with conn.cursor() as cursor:
            cursor.execute(insert_query, tuple(publication.values()))


def insert_publication_data(riset_id: int, strains: List, conn: Any):
    species_id = None
    with conn.cursor() as cursor:
        cursor.execute("SELECT SpeciesId from InbredSet where "
                       "Id=%s" % riset_id)
        species_id, *_ = cursor.fetchone()
        cursor.execute(
            "SELECT Id FROM Strain WHERE SpeciesId="
            "%s AND Name in (%s)"
            % (riset_id, ", ".join(f"'{strain}'" for strain in strains)))
        strain_ids = cursor.fetchall()
        data_id, *_ = cursor.execute("SELECT max(Id) "
                                     "from PublishData").fetchone()

        if strain_ids:
            strain_ids = list(chain(*strain_ids))
        for strain, strain_id in zip(strains, strain_ids):
            # TODO: Fetch this values correctly using sql
            value, variance, trait_np = 1, 1, 1
            if value:
                cursor.execute(
                    "INSERT INTO PublishData values (%d, %d, %s)" %
                    (data_id, strain_id, value))
            if variance:
                cursor.execute(
                    "INSERT INTO PublishSE values (%d, %d, %s)" %
                    (data_id, strain_id, variance))
            if trait_np:
                cursor.execute(
                    "INSERT INTO NStrain values (%d, %d, %s)" %
                    (data_id, strain_id, trait_np))
        inbred_set_id = lookup_webqtldataset_name(riset_name="",
                                                  conn=conn)
        cursor.execute(
            "SELECT max(Sequence) FROM PublishXRef where "
            "InbredSetId = %d and PhenotypeId = %d and PublicationId = %d" %
            (inbred_set_id, phenotype_id, publication_id))


def insert_phenotype(phenotype: Optional[Dict], conn: Any):
    insert_query = ("INSERT into Phenotype (%s) Values (%s)" %
                    (", ".join(phenotype.keys()),
                     ", ".join(['%s'] * len(phenotype))))
    with conn.cursor() as cursor:
        cursor.execute(insert_query, tuple(phenotype.values()))