aboutsummaryrefslogtreecommitdiff
path: root/wqflask/maintenance/gen_ind_genofiles.py
diff options
context:
space:
mode:
Diffstat (limited to 'wqflask/maintenance/gen_ind_genofiles.py')
-rw-r--r--wqflask/maintenance/gen_ind_genofiles.py253
1 files changed, 0 insertions, 253 deletions
diff --git a/wqflask/maintenance/gen_ind_genofiles.py b/wqflask/maintenance/gen_ind_genofiles.py
deleted file mode 100644
index b755c648..00000000
--- a/wqflask/maintenance/gen_ind_genofiles.py
+++ /dev/null
@@ -1,253 +0,0 @@
-#!/usr/bin/env python3
-"""A script that generates the genotype files for groups of individuals, using an existing strain genotype file as a basis
-
-Example commands:
-python3 gen_ind_genofiles.py
- /home/zas1024/gn2-zach/genotype_files/genotype/
- /home/zas1024/gn2-zach/new_geno/
- BXD-Micturition.geno
- BXD.json
-python3 gen_ind_genofiles.py
- /home/zas1024/gn2-zach/genotype_files/genotype
- /home/zas1024/gn2-zach/new_geno/
- BXD-Micturition.geno
- BXD.2.geno BXD.4.geno BXD.5.geno
-
-"""
-
-import json
-import os
-import sys
-from typing import List
-
-import MySQLdb
-
-def conn():
- return MySQLdb.Connect(db=os.environ.get("DB_NAME"),
- user=os.environ.get("DB_USER"),
- passwd=os.environ.get("DB_PASS"),
- host=os.environ.get("DB_HOST"))
-
-def main(args):
-
- # Directory in which .geno files are located
- geno_dir = args[1]
-
- # Directory in which to output new files
- out_dir = args[2]
-
- # The individuals group that we want to generate a .geno file for
- target_file = geno_dir + args[3]
-
- # The source group(s) we're generating the .geno files from
- # This can be passed as either a specific .geno file (or set of files as multiple arguments),
- # or as a JSON file containing a set of .geno files (and their corresponding file names and sample lists)
- geno_json = {}
- source_files = []
- if ".json" in args[4]:
- geno_json = json.load(open(geno_dir + args[4], "r"))
- par_f1s = {
- "mat": geno_json['mat'],
- "pat": geno_json['pat'],
- "f1s": geno_json['f1s']
- }
-
- # List of file titles and locations from JSON
- source_files = [{'title': genofile['title'], 'location': geno_dir + genofile['location']} for genofile in geno_json['genofile']]
- else:
- par_f1s = {}
- # List of files directly taken from command line arguments, with titles just set to the filename
- for group in args[4:]:
- file_name = geno_dir + group + ".geno" if ".geno" not in group else geno_dir + group
- source_files.append({'title': file_name[:-5], 'location': file_name})
-
- if len(source_files) > 1:
- # Generate a JSON file pointing to the new target genotype files, in situations where there are multiple source .geno files
- target_json_loc = out_dir + ".".join(args[3].split(".")[:-1]) + ".json"
- target_json = {'genofile': []}
-
- # Generate the output .geno files
- for source_file in source_files:
- filename, samples = generate_new_genofile(source_file['location'], target_file, par_f1s, out_dir)
-
- target_json['genofile'].append({
- 'location': filename.split("/")[-1],
- 'title': source_file['title'],
- 'sample_list': samples
- })
-
- json.dump(target_json, open(target_json_loc, "w"))
- else:
- filename, samples = generate_new_genofile(source_files[0]['location'], target_file, par_f1s, out_dir)
-
-def get_strain_for_sample(sample):
- query = (
- "SELECT CaseAttributeXRefNew.Value "
- "FROM CaseAttributeXRefNew, Strain "
- "WHERE CaseAttributeXRefNew.CaseAttributeId=11 "
- "AND CaseAttributeXRefNew.StrainId = Strain.Id "
- "AND Strain.Name = %(name)s" )
-
- with conn().cursor() as cursor:
- cursor.execute(query, {"name": sample.strip()})
- strain = cursor.fetchone()[0]
- return strain
-
-def generate_new_genofile(source_genofile, target_genofile, par_f1s, out_dir):
- source_samples = group_samples(source_genofile)
- source_genotypes = strain_genotypes(source_genofile)
- target_samples = group_samples(target_genofile)
- strain_pos_map = map_strain_pos_to_target_group(source_samples, target_samples, par_f1s)
-
- if len(source_genofile.split("/")[-1].split(".")) > 2:
- # The number in the source genofile; for example 4 in BXD.4.geno
- source_num = source_genofile.split("/")[-1].split(".")[-2]
- target_filename = ".".join(target_genofile.split("/")[-1].split(".")[:-1]) + "." + source_num + ".geno"
- else:
- target_filename = ".".join(target_genofile.split("/")[-1].split(".")[:-1]) + ".geno"
-
- file_location = out_dir + target_filename
-
- with open(file_location, "w") as fh:
- for metadata in ["name", "type", "mat", "pat", "het", "unk"]:
- fh.write("@" + metadata + ":" + source_genotypes[metadata] + "\n")
-
- header_line = ["Chr", "Locus", "cM", "Mb"] + target_samples
- fh.write("\t".join(header_line) + "\n")
-
- for marker in source_genotypes['markers']:
- line_items = [
- marker['Chr'],
- marker['Locus'],
- marker['cM'],
- marker['Mb']
- ]
-
- for pos in strain_pos_map:
- if isinstance(pos, int):
- line_items.append(marker['genotypes'][pos])
- else:
- if pos in ["mat", "pat"]:
- line_items.append(source_genotypes[pos])
- elif pos == "f1s":
- line_items.append("H")
- else:
- line_items.append("U")
-
- fh.write("\t".join(line_items) + "\n")
-
- return file_location, target_samples
-
-def map_strain_pos_to_target_group(source_samples, target_samples, par_f1s):
- """
- Retrieve corresponding strain position for each sample in the target group
-
- This is so the genotypes from the base genofile can be mapped to the samples in the target group
-
- For example:
- Base strains: BXD1, BXD2, BXD3
- Target samples: BXD1_1, BXD1_2, BXD2_1, BXD3_1, BXD3_2, BXD3_3
- Returns: [0, 0, 1, 2, 2, 2]
- """
- pos_map = []
- for sample in target_samples:
- sample_strain = get_strain_for_sample(sample)
- if sample_strain in source_samples:
- pos_map.append(source_samples.index(sample_strain))
- else:
- val = "U"
- for key in par_f1s.keys():
- if sample_strain in par_f1s[key]:
- val = key
- pos_map.append(val)
-
- return pos_map
-
-def group_samples(target_file: str) -> List:
- """
- Get the group samples from its "dummy" .geno file (which still contains the sample list)
- """
-
- sample_list = []
- with open(target_file, "r") as target_geno:
- for i, line in enumerate(target_geno):
- # Skip header lines
- if line[0] in ["#", "@"] or not len(line):
- continue
-
- line_items = line.split()
-
- sample_list = [item for item in line_items if item not in ["Chr", "Locus", "Mb", "cM"]]
- break
-
- return sample_list
-
-def strain_genotypes(strain_genofile: str) -> List:
- """
- Read genotypes from source strain .geno file
-
- :param strain_genofile: string of genofile filename
- :return: a list of dictionaries representing each marker's genotypes
-
- Example output: [
- {
- 'Chr': '1',
- 'Locus': 'marker1',
- 'Mb': '10.0',
- 'cM': '8.0',
- 'genotypes': [('BXD1', 'B'), ('BXD2', 'D'), ('BXD3', 'H'), ...]
- },
- ...
- ]
- """
-
- geno_dict = {}
-
- geno_start_col = None
- header_columns = []
- sample_list = []
- markers = []
- with open(strain_genofile, "r") as source_geno:
- for i, line in enumerate(source_geno):
- if line[0] == "@":
- metadata_type = line[1:].split(":")[0]
- if metadata_type in ['name', 'type', 'mat', 'pat', 'het', 'unk']:
- geno_dict[metadata_type] = line.split(":")[1].strip()
-
- continue
-
- # Skip other header lines
- if line[0] == "#" or not len(line):
- continue
-
- line_items = line.split("\t")
- if "Chr" in line_items: # Header row
- # Get the first column index containing genotypes
- header_columns = line_items
- for j, item in enumerate(line_items):
- if item not in ["Chr", "Locus", "Mb", "cM"]:
- geno_start_col = j
- break
-
- sample_list = line_items[geno_start_col:]
- if not geno_start_col:
- print("Check .geno file - expected columns not found")
- sys.exit()
- else: # Marker rows
- this_marker = {
- 'Chr': line_items[header_columns.index("Chr")],
- 'Locus': line_items[header_columns.index("Locus")],
- 'Mb': line_items[header_columns.index("Mb")],
- 'cM': line_items[header_columns.index("cM")],
- 'genotypes': [item.strip() for item in line_items][geno_start_col:]
- }
-
- markers.append(this_marker)
-
- geno_dict['markers'] = markers
-
- return geno_dict
-
-if __name__ == "__main__":
- main(sys.argv)
-