1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# Example command: env GN2_PROFILE=/usr/local/guix-profiles/gn-latest-20220122 TMPDIR=/export/local/home/zas1024/gn2-zach/tmp WEBSERVER_MODE=DEBUG LOG_LEVEL=DEBUG SERVER_PORT=5002 GENENETWORK_FILES=/export/local/home/zas1024/gn2-zach/genotype_files SQL_URI=mysql://webqtlout:webqtlout@localhost/db_webqtl ./bin/genenetwork2 ./etc/default_settings.py -c ./maintenance/gen_ind_genofiles.py
import sys
from typing import List
import MySQLdb
from wqflask import app
def conn():
return MySQLdb.Connect(db=app.config.get("DB_NAME"),
user=app.config.get("DB_USER"),
passwd=app.config.get("DB_PASS"),
host=app.config.get("DB_HOST"))
def main(args):
# The file of the "main" .geno file for the group in question
# For example: BXD.geno or BXD.6.geno if converting to BXD individual genofiles
source_genofile = args[1]
# The target individuals/samples group(s) we're generating the .geno files for
# This can be passed as either a specific .geno file, or as a JSON file
# containing a set of .geno files (and their corresponding file names and sample lists)
if ".json" in args[2]:
target_groups = json.load(args[2])['genofile']
else:
target_groups = [args[2]]
# Generate the output .geno files
generate_new_genofiles(strain_genotypes(source_genofile), target_groups)
def get_strain_for_sample(sample):
query = (
"SELECT CaseAttributeXRefNew.Value "
"FROM CaseAttributeXRefNew, Strain "
"WHERE CaseAttributeXRefNew.CaseAttributeId=11 "
"AND CaseAttributeXRef.New.StrainId = Strain.Id "
"AND Strain.Name = %(name)s" )
with conn.cursor() as cursor:
return cursor.execute(query, {"name": name}).fetchone()[0]
def group_samples(target_group: str) -> List:
"""
Get the group samples from its "dummy" .geno file (which still contains the sample list)
"""
# Allow for inputting the target group as either the group name or .geno file
file_location = app.config.get("GENENETWORK_FILES") + "/genotype/" + target_group
if ".geno" not in target_group:
file_location += ".geno"
sample_list = []
with open(file_location, "r") as target_geno:
for i, line in enumerate(target_geno):
# Skip header lines
if line[0] in ["#", "@"] or not len(line):
continue
line_items = line.split("\t")
sample_list = [item for item in line_items if item not in ["Chr", "Locus", "Mb", "cM"]]
break
return sample_list
def strain_genotypes(strain_genofile: str) -> List:
"""
Read genotypes from source strain .geno file
:param strain_genofile: string of genofile filename
:return: a list of dictionaries representing each marker's genotypes
Example output: [
{
'Chr': '1',
'Locus': 'marker1',
'Mb': '10.0',
'cM': '8.0',
'genotypes': [('BXD1', 'B'), ('BXD2', 'D'), ('BXD3', 'H'), ...]
},
...
]
"""
file_location = app.config.get("GENENETWORK_FILES") + "/genotype/" + strain_genofile
geno_start_col = None
header_columns = []
sample_list = []
marker_genotypes = []
with open(file_location, "r") as source_geno:
for i, line in enumerate(source_geno):
# Skip header lines
if line[0] in ["#", "@"] or not len(line):
continue
line_items = line.split("\t")
if "Chr" in line_items: # Header row
# Get the first column index containing genotypes
header_columns = line_items
for j, item in enumerate(line_items):
if item not in ["Chr", "Locus", "Mb", "cM"]:
geno_start_col = j
break
sample_list = line_items[geno_start_col:]
if not geno_start_col:
print("Check .geno file - expected columns not found")
sys.exit()
else: # Marker rows
this_marker = {
'Chr': line_items[header_columns.index("Chr")],
'Locus': line_items[header_columns.index("Locus")],
'Mb': line_items[header_columns.index("Mb")],
'cM': line_items[header_columns.index("cM")],
'genotypes': list(zip(sample_list, [item.strip() for item in line_items][geno_start_col:]))
}
marker_genotypes.append(this_marker)
return marker_genotypes
if __name__ == "__main__":
print("command line arguments:\n\t%s" % sys.argv)
main(sys.argv)
|