aboutsummaryrefslogtreecommitdiff
path: root/uploader/db_utils.py
blob: 357b3da022c932fed8f291a40dc54e5021739490 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""module contains all db related stuff"""
import logging
import traceback
import contextlib
from urllib.parse import urlparse
from typing import Any, Tuple, Optional, Iterator, Callable

import MySQLdb as mdb
from redis import Redis
from flask import current_app as app

def parse_db_url(db_url) -> Tuple:
    """
    Parse SQL_URI configuration variable.
    """
    parsed_db = urlparse(db_url)
    return (parsed_db.hostname, parsed_db.username,
            parsed_db.password, parsed_db.path[1:], parsed_db.port)


@contextlib.contextmanager
def database_connection(db_url: str) -> Iterator[mdb.Connection]:
    """function to create db connector"""
    host, user, passwd, db_name, db_port = parse_db_url(db_url)
    connection = mdb.connect(
        host, user, passwd, db_name, port=(db_port or 3306))
    try:
        yield connection
        connection.commit()
    except mdb.Error as _mdb_err:
        logging.error(traceback.format_exc())
        connection.rollback()
    finally:
        connection.close()

def with_db_connection(func: Callable[[mdb.Connection], Any]) -> Any:
    """Call `func` with a MySQDdb database connection."""
    with database_connection(app.config["SQL_URI"]) as conn:
        return func(conn)

def with_redis_connection(func: Callable[[Redis], Any]) -> Any:
    """Call `func` with a redis connection."""
    redisuri = app.config["REDIS_URL"]
    with Redis.from_url(redisuri, decode_responses=True) as rconn:
        return func(rconn)