aboutsummaryrefslogtreecommitdiff
path: root/wqflask/utility/gen_geno_ob.py
blob: 24604e582837bd0c3391d424c531a2100de33b0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import utility.logger
logger = utility.logger.getLogger(__name__)


class genotype:
    """
    Replacement for reaper.Dataset so we can remove qtlreaper use while still generating mapping output figure
    """

    def __init__(self, filename):
        self.group = None
        self.type = "riset"
        self.prgy = []
        self.nprgy = 0
        self.mat = -1
        self.pat = 1
        self.het = 0
        self.unk = "U"
        self.filler = False
        self.mb_exists = False

        # ZS: This is because I'm not sure if some files switch the column that contains Mb/cM positions; might be unnecessary
        self.cm_column = 2
        self.mb_column = 3

        self.chromosomes = []

        self.read_file(filename)

    def __iter__(self):
        return iter(self.chromosomes)

    def __getitem__(self, index):
        return self.chromosomes[index]

    def __len__(self):
        return len(self.chromosomes)

    def read_rdata_output(self, qtl_results):
        # ZS: This is necessary because R/qtl requires centimorgan marker positions, which it normally gets from the .geno file, but that doesn't exist for HET3-ITP (which only has RData), so it needs to read in the marker cM positions from the results
        self.chromosomes = []  # ZS: Overwriting since the .geno file's contents are just placeholders

        this_chr = ""  # ZS: This is so it can track when the chromosome changes as it iterates through markers
        chr_ob = None
        for marker in qtl_results:
            locus = Locus(self)
            if (str(marker['chr']) != this_chr) and this_chr != "X":  # ZS: This is really awkward but works as a temporary fix
                if this_chr != "":
                    self.chromosomes.append(chr_ob)
                this_chr = str(marker['chr'])
                if this_chr == "20":
                    this_chr = "X"
                chr_ob = Chr(this_chr, self)
            if 'chr' in marker:
                locus.chr = str(marker['chr'])
            if 'name' in marker:
                locus.name = marker['name']
            if 'Mb' in marker:
                locus.Mb = marker['Mb']
            if 'cM' in marker:
                locus.cM = marker['cM']
            chr_ob.loci.append(locus)

        self.chromosomes.append(chr_ob)

        return self

    def read_file(self, filename):
        with open(filename, 'r') as geno_file:
            lines = geno_file.readlines()

            this_chr = ""  # ZS: This is so it can track when the chromosome changes as it iterates through markers
            chr_ob = None
            for line in lines:
                if line[0] == "#":
                    continue
                elif line[0] == "@":
                    label = line.split(":")[0][1:]
                    if label == "name":
                        self.group = line.split(":")[1].strip()
                    elif label == "filler":
                        if line.split(":")[1].strip() == "yes":
                            self.filler = True
                    elif label == "type":
                        self.type = line.split(":")[1].strip()
                    elif label == "mat":
                        self.mat = line.split(":")[1].strip()
                    elif label == "pat":
                        self.pat = line.split(":")[1].strip()
                    elif label == "het":
                        self.het = line.split(":")[1].strip()
                    elif label == "unk":
                        self.unk = line.split(":")[1].strip()
                    else:
                        continue
                elif line[:3] == "Chr":
                    header_row = line.split("\t")
                    if header_row[2] == "Mb":
                        self.mb_exists = True
                        self.mb_column = 2
                        self.cm_column = 3
                    elif header_row[3] == "Mb":
                        self.mb_exists = True
                        self.mb_column = 3
                    elif header_row[2] == "cM":
                        self.cm_column = 2

                    if self.mb_exists:
                        self.prgy = header_row[4:]
                    else:
                        self.prgy = header_row[3:]
                    self.nprgy = len(self.prgy)
                else:
                    if line.split("\t")[0] != this_chr:
                        if this_chr != "":
                            self.chromosomes.append(chr_ob)
                        this_chr = line.split("\t")[0]
                        chr_ob = Chr(line.split("\t")[0], self)
                    chr_ob.add_marker(line.split("\t"))

            self.chromosomes.append(chr_ob)


class Chr:
    def __init__(self, name, geno_ob):
        self.name = name
        self.loci = []
        self.mb_exists = geno_ob.mb_exists
        self.cm_column = geno_ob.cm_column
        self.mb_column = geno_ob.mb_column
        self.geno_ob = geno_ob

    def __iter__(self):
        return iter(self.loci)

    def __getitem__(self, index):
        return self.loci[index]

    def __len__(self):
        return len(self.loci)

    def add_marker(self, marker_row):
        self.loci.append(Locus(self.geno_ob, marker_row))


class Locus:
    def __init__(self, geno_ob, marker_row=None):
        self.chr = None
        self.name = None
        self.cM = None
        self.Mb = None
        self.genotype = []
        if marker_row:
            self.chr = marker_row[0]
            self.name = marker_row[1]
            try:
                self.cM = float(marker_row[geno_ob.cm_column])
            except:
                self.cM = float(marker_row[geno_ob.mb_column]) if geno_ob.mb_exists else 0
            try:
                self.Mb = float(marker_row[geno_ob.mb_column]) if geno_ob.mb_exists else None
            except:
                self.Mb = self.cM

            geno_table = {
                geno_ob.mat: -1,
                geno_ob.pat: 1,
                geno_ob.het: 0,
                geno_ob.unk: "U"
            }

            self.genotype = []
            if geno_ob.mb_exists:
                start_pos = 4
            else:
                start_pos = 3

            for allele in marker_row[start_pos:]:
                if allele in list(geno_table.keys()):
                    self.genotype.append(geno_table[allele])
                else:  # ZS: Some genotype appears that isn't specified in the metadata, make it unknown
                    self.genotype.append("U")