aboutsummaryrefslogtreecommitdiff
path: root/gn3/db/__init__.py
blob: 1eb7b120e38749f8c05ab2db718a72676f5d0f90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# pylint: disable=[R0902, R0903]
"""Module that exposes common db operations"""
from typing import Optional, Dict, Any
from dataclasses import dataclass, asdict, astuple
from typing_extensions import Protocol
from MySQLdb import escape_string

from gn3.db.phenotypes import Phenotype
from gn3.db.phenotypes import PublishXRef
from gn3.db.phenotypes import Publication

from gn3.db.phenotypes import phenotype_mapping
from gn3.db.phenotypes import publish_x_ref_mapping
from gn3.db.phenotypes import publication_mapping

TABLEMAP = {
    "Phenotype": phenotype_mapping,
    "PublishXRef": publish_x_ref_mapping,
    "Publication": publication_mapping,
}

DATACLASSMAP = {
    "Phenotype": Phenotype,
    "PublishXRef": PublishXRef,
    "Publication": Publication,
}


class Dataclass(Protocol):
    """Type Definition for a Dataclass"""
    __dataclass_fields__: Dict


def update(conn: Any,
           table: str,
           data: Dataclass,
           where: Dataclass) -> Optional[int]:
    """Run an UPDATE on a table"""
    if not (any(astuple(data)) and any(astuple(where))):
        return None
    sql = f"UPDATE {table} SET "
    sql += ", ".join(f"{TABLEMAP[table].get(k)} "
                     f"= '{escape_string(str(v)).decode('utf-8')}'" for
                     k, v in asdict(data).items()
                     if v is not None and k in TABLEMAP[table])
    sql += " WHERE "
    sql += "AND ".join(f"{TABLEMAP[table].get(k)} = "
                       f"'{escape_string(str(v)).decode('utf-8')}'" for
                       k, v in asdict(where).items()
                       if v is not None and k in TABLEMAP[table])
    with conn.cursor() as cursor:
        cursor.execute(sql)
        return cursor.rowcount


def fetchone(conn: Any,
             table: str,
             where: Dataclass) -> Optional[Dataclass]:
    """Run a SELECT on a table. Returns only one result!"""
    if not any(astuple(where)):
        return None
    sql = f"SELECT * FROM {table} "
    sql += "WHERE "
    sql += "AND ".join(f"{TABLEMAP[table].get(k)} = "
                       f"'{escape_string(str(v)).decode('utf-8')}'" for
                       k, v in asdict(where).items()
                       if v is not None and k in TABLEMAP[table])
    with conn.cursor() as cursor:
        cursor.execute(sql)
        return DATACLASSMAP[table](*cursor.fetchone())