aboutsummaryrefslogtreecommitdiff
path: root/scripts/find_dbs_with_NAs.py
blob: a4ff3ee545a5bc24ecb0494e6dc09972cb6831a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import os
from typing import Tuple
from urllib.parse import urlparse
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor

def parse_db_url(sql_uri: str) -> Tuple:
    """function to parse SQL_URI"""
    parsed_db = urlparse(sql_uri)
    return (
        parsed_db.hostname, parsed_db.username, parsed_db.password,
        parsed_db.path[1:], parsed_db.port)

sql_uri = os.environ.get("SQL_URI")
host, user, passwd, db_name, port = parse_db_url(sql_uri)
conn = mdb.connect(db=db_name,
                   user=user,
                   passwd=passwd,
                   host=host,
                   port=port)

query = (
    "SELECT psf.Id, psf.FullName "
    "FROM ProbeSetXRef AS psx "
    "INNER JOIN ProbeSetFreeze AS psf ON psx.ProbeSetFreezeId = psf.Id "
    "INNER JOIN ProbeFreeze AS pf ON psf.ProbeFreezeId = pf.Id "
    "INNER JOIN InbredSet AS ibs ON pf.InbredSetId = ibs.Id "
    "WHERE psx.Locus IS NULL AND "
    "ibs.MappingMethodId = 1"
)

dataset_NAs_counts = {}
dataset_name_dict = {} # Mapping of IDs to names
with conn.cursor(cursorclass=DictCursor) as cursor:
    cursor.execute(query)
    for row in cursor.fetchall():
        dset_id = str(row['Id'])
        if dset_id in dataset_NAs_counts:
            dataset_NAs_counts[dset_id] += 1
        else:
            dataset_NAs_counts[dset_id] = 1
        if dset_id not in dataset_name_dict:
            dataset_name_dict[dset_id] = str(row['FullName'])

sorted_NAs_counts = {k: v for k, v in sorted(dataset_NAs_counts.items(), key=lambda item: item[1], reverse=True)}

output_path = os.path.join(os.environ.get("TMPDIR"), "filtered_NAs_list.csv")
with open(output_path, "w") as out_file:
    out_file.write("ID\tFullName\tCount\n")
    for dset_id in sorted_NAs_counts:
        out_file.write("\t".join([str(dset_id), str(dataset_name_dict[dset_id]), str(sorted_NAs_counts[dset_id])]) + "\n")