aboutsummaryrefslogtreecommitdiff
path: root/wqflask/maintenance/gen_ind_genofiles.py
blob: 6e818945246bda01fbfb233423ea3942e15973ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Example command: env GN2_PROFILE=/usr/local/guix-profiles/gn-latest-20220122 TMPDIR=/export/local/home/zas1024/gn2-zach/tmp WEBSERVER_MODE=DEBUG LOG_LEVEL=DEBUG SERVER_PORT=5002 GENENETWORK_FILES=/export/local/home/zas1024/gn2-zach/genotype_files SQL_URI=mysql://webqtlout:webqtlout@localhost/db_webqtl ./bin/genenetwork2 ./etc/default_settings.py -c ./maintenance/gen_ind_genofiles.py

import sys
from typing import List

import MySQLdb

from wqflask import app

def conn():
    return MySQLdb.Connect(db=app.config.get("DB_NAME"),
                           user=app.config.get("DB_USER"),
                           passwd=app.config.get("DB_PASS"),
                           host=app.config.get("DB_HOST"))

def main(args):

    # The file of the "main" .geno file for the group in question
    # For example: BXD.geno or BXD.6.geno if converting to BXD individual genofiles
    source_genofile = args[1] 

    # The target individuals/samples group(s) we're generating the .geno files for
    # This can be passed as either a specific .geno file, or as a JSON file
    # containing a set of .geno files (and their corresponding file names and sample lists)
    if ".json" in args[2]:
        target_groups = json.load(args[2])['genofile']
    else:
        target_groups = [args[2]]

    # Generate the output .geno files
    generate_new_genofiles(strain_genotypes(source_genofile), target_groups)

def get_strain_for_sample(sample):
    query = (
        "SELECT CaseAttributeXRefNew.Value "
        "FROM CaseAttributeXRefNew, Strain "
        "WHERE CaseAttributeXRefNew.CaseAttributeId=11 "
        "AND CaseAttributeXRef.New.StrainId = Strain.Id "
        "AND Strain.Name = %(name)s" )

    with conn.cursor() as cursor:
        return cursor.execute(query, {"name": name}).fetchone()[0]

def group_samples(target_group: str) -> List:
    """
    Get the group samples from its "dummy" .geno file (which still contains the sample list)
    """

    # Allow for inputting the target group as either the group name or .geno file
    file_location = app.config.get("GENENETWORK_FILES") + "/genotype/" + target_group
    if ".geno" not in target_group:
        file_location += ".geno"

    sample_list = []
    with open(file_location, "r") as target_geno:
        for i, line in enumerate(target_geno):
            # Skip header lines
            if line[0] in ["#", "@"] or not len(line):
                continue
    
            line_items = line.split("\t")
            sample_list = [item for item in line_items if item not in ["Chr", "Locus", "Mb", "cM"]]
            break

    return sample_list

def strain_genotypes(strain_genofile: str) -> List:
    """
    Read genotypes from source strain .geno file

    :param strain_genofile: string of genofile filename
    :return: a list of dictionaries representing each marker's genotypes

    Example output: [
        {
            'Chr': '1',
            'Locus': 'marker1',
            'Mb': '10.0',
            'cM': '8.0',
            'genotypes': [('BXD1', 'B'), ('BXD2', 'D'), ('BXD3', 'H'), ...]
        },
        ...
    ]
    """

    file_location = app.config.get("GENENETWORK_FILES") + "/genotype/" + strain_genofile

    geno_start_col = None
    header_columns = []
    sample_list = []
    marker_genotypes = []
    with open(file_location, "r") as source_geno:
        for i, line in enumerate(source_geno):
            # Skip header lines
            if line[0] in ["#", "@"] or not len(line):
                continue

            line_items = line.split("\t")

            if "Chr" in line_items: # Header row
                # Get the first column index containing genotypes
                header_columns = line_items
                for j, item in enumerate(line_items):
                    if item not in ["Chr", "Locus", "Mb", "cM"]:
                        geno_start_col = j
                        break

                sample_list = line_items[geno_start_col:]
                if not geno_start_col:
                    print("Check .geno file - expected columns not found")
                    sys.exit()
            else: # Marker rows
                this_marker = {
                    'Chr': line_items[header_columns.index("Chr")],
                    'Locus': line_items[header_columns.index("Locus")],
                    'Mb': line_items[header_columns.index("Mb")],
                    'cM': line_items[header_columns.index("cM")],
                    'genotypes': list(zip(sample_list, [item.strip() for item in line_items][geno_start_col:]))
                }
                marker_genotypes.append(this_marker)

    return marker_genotypes
            
if __name__ == "__main__":
    print("command line arguments:\n\t%s" % sys.argv)
    main(sys.argv)