diff options
author | zsloan | 2022-03-22 19:02:15 +0000 |
---|---|---|
committer | zsloan | 2022-03-22 19:02:15 +0000 |
commit | a49da43ba00245cf23a2b72c314127986f567f28 (patch) | |
tree | ce64e83370c52add94927bc050febf5d242722db /wqflask/maintenance/gen_ind_genofiles.py | |
parent | 68ac19153b128f60b660e11365e5fd4304c95300 (diff) | |
parent | 32cb57b82db328bc84753af9d25e9aaa1bd31152 (diff) | |
download | genenetwork2-a49da43ba00245cf23a2b72c314127986f567f28.tar.gz |
Merge remote-tracking branch 'origin/testing' into feature/add_rqtl_pairscan
Diffstat (limited to 'wqflask/maintenance/gen_ind_genofiles.py')
-rw-r--r-- | wqflask/maintenance/gen_ind_genofiles.py | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/wqflask/maintenance/gen_ind_genofiles.py b/wqflask/maintenance/gen_ind_genofiles.py new file mode 100644 index 00000000..8b958efa --- /dev/null +++ b/wqflask/maintenance/gen_ind_genofiles.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +"""A script that generates the genotype files for groups of individuals, using an existing strain genotype file as a basis + +Example commands: +python3 gen_ind_genofiles.py + /home/zas1024/gn2-zach/genotype_files/genotype/ + /home/zas1024/gn2-zach/new_geno/ + BXD-Micturition.geno + BXD.json +python3 gen_ind_genofiles.py + /home/zas1024/gn2-zach/genotype_files/genotype + /home/zas1024/gn2-zach/new_geno/ + BXD-Micturition.geno + BXD.2.geno BXD.4.geno BXD.5.geno + +""" + +import json +import os +import sys +from typing import List + +import MySQLdb + +def conn(): + return MySQLdb.Connect(db=os.environ.get("DB_NAME"), + user=os.environ.get("DB_USER"), + passwd=os.environ.get("DB_PASS"), + host=os.environ.get("DB_HOST")) + +def main(args): + + # Directory in which .geno files are located + geno_dir = args[1] + + # Directory in which to output new files + out_dir = args[2] + + # The individuals group that we want to generate a .geno file for + target_file = geno_dir + args[3] + + # The source group(s) we're generating the .geno files from + # This can be passed as either a specific .geno file (or set of files as multiple arguments), + # or as a JSON file containing a set of .geno files (and their corresponding file names and sample lists) + geno_json = {} + source_files = [] + if ".json" in args[4]: + geno_json = json.load(open(geno_dir + args[4], "r")) + par_f1s = { + "mat": geno_json['mat'], + "pat": geno_json['pat'], + "f1s": geno_json['f1s'] + } + + # List of file titles and locations from JSON + source_files = [{'title': genofile['title'], 'location': geno_dir + genofile['location']} for genofile in geno_json['genofile']] + else: + par_f1s = {} + # List of files directly taken from command line arguments, with titles just set to the filename + for group in args[4:]: + file_name = geno_dir + group + ".geno" if ".geno" not in group else group + source_files.append({'title': file_name[:-5], 'location': file_name}) + + if len(source_files) > 1: + # Generate a JSON file pointing to the new target genotype files, in situations where there are multiple source .geno files + target_json_loc = out_dir + ".".join(args[3].split(".")[:-1]) + ".json" + target_json = {'genofile': []} + + # Generate the output .geno files + for source_file in source_files: + filename, samples = generate_new_genofile(source_file['location'], target_file, par_f1s, out_dir) + + target_json['genofile'].append({ + 'location': filename.split("/")[-1], + 'title': source_file['title'], + 'sample_list': samples + }) + + json.dump(target_json, open(target_json_loc, "w")) + +def get_strain_for_sample(sample): + query = ( + "SELECT CaseAttributeXRefNew.Value " + "FROM CaseAttributeXRefNew, Strain " + "WHERE CaseAttributeXRefNew.CaseAttributeId=11 " + "AND CaseAttributeXRefNew.StrainId = Strain.Id " + "AND Strain.Name = %(name)s" ) + + with conn().cursor() as cursor: + cursor.execute(query, {"name": sample.strip()}) + return cursor.fetchone()[0] + +def generate_new_genofile(source_genofile, target_genofile, par_f1s, out_dir): + source_samples = group_samples(source_genofile) + source_genotypes = strain_genotypes(source_genofile) + target_samples = group_samples(target_genofile) + strain_pos_map = map_strain_pos_to_target_group(source_samples, target_samples, par_f1s) + + if len(source_genofile.split("/")[-1].split(".")) > 2: + # The number in the source genofile; for example 4 in BXD.4.geno + source_num = source_genofile.split("/")[-1].split(".")[-2] + target_filename = ".".join(target_genofile.split("/")[-1].split(".")[:-1]) + "." + source_num + ".geno" + else: + target_filename = ".".join(target_genofile.split("/")[-1].split(".")[:-1]) + ".geno" + + file_location = out_dir + target_filename + + with open(file_location, "w") as fh: + for metadata in ["name", "type", "mat", "pat", "het", "unk"]: + fh.write("@" + metadata + ":" + source_genotypes[metadata] + "\n") + + header_line = ["Chr", "Locus", "cM", "Mb"] + target_samples + fh.write("\t".join(header_line)) + + for marker in source_genotypes['markers']: + line_items = [ + marker['Chr'], + marker['Locus'], + marker['cM'], + marker['Mb'] + ] + + for pos in strain_pos_map: + if isinstance(pos, int): + line_items.append(marker['genotypes'][pos]) + else: + if pos in ["mat", "pat"]: + line_items.append(source_genotypes[pos]) + elif pos == "f1s": + line_items.append("H") + else: + line_items.append("U") + + fh.write("\t".join(line_items) + "\n") + + return file_location, target_samples + +def map_strain_pos_to_target_group(source_samples, target_samples, par_f1s): + """ + Retrieve corresponding strain position for each sample in the target group + + This is so the genotypes from the base genofile can be mapped to the samples in the target group + + For example: + Base strains: BXD1, BXD2, BXD3 + Target samples: BXD1_1, BXD1_2, BXD2_1, BXD3_1, BXD3_2, BXD3_3 + Returns: [0, 0, 1, 2, 2, 2] + """ + pos_map = [] + for sample in target_samples: + sample_strain = get_strain_for_sample(sample) + if sample_strain in source_samples: + pos_map.append(source_samples.index(sample_strain)) + else: + val = "U" + for key in par_f1s.keys(): + if sample_strain in par_f1s[key]: + val = key + pos_map.append(val) + + return pos_map + +def group_samples(target_file: str) -> List: + """ + Get the group samples from its "dummy" .geno file (which still contains the sample list) + """ + + sample_list = [] + with open(target_file, "r") as target_geno: + for i, line in enumerate(target_geno): + # Skip header lines + if line[0] in ["#", "@"] or not len(line): + continue + + line_items = line.split("\t") + sample_list = [item for item in line_items if item not in ["Chr", "Locus", "Mb", "cM"]] + break + + return sample_list + +def strain_genotypes(strain_genofile: str) -> List: + """ + Read genotypes from source strain .geno file + + :param strain_genofile: string of genofile filename + :return: a list of dictionaries representing each marker's genotypes + + Example output: [ + { + 'Chr': '1', + 'Locus': 'marker1', + 'Mb': '10.0', + 'cM': '8.0', + 'genotypes': [('BXD1', 'B'), ('BXD2', 'D'), ('BXD3', 'H'), ...] + }, + ... + ] + """ + + geno_dict = {} + + geno_start_col = None + header_columns = [] + sample_list = [] + markers = [] + with open(strain_genofile, "r") as source_geno: + for i, line in enumerate(source_geno): + if line[0] == "@": + metadata_type = line[1:].split(":")[0] + if metadata_type in ['name', 'type', 'mat', 'pat', 'het', 'unk']: + geno_dict[metadata_type] = line.split(":")[1].strip() + + continue + + # Skip other header lines + if line[0] == "#" or not len(line): + continue + + line_items = line.split("\t") + if "Chr" in line_items: # Header row + # Get the first column index containing genotypes + header_columns = line_items + for j, item in enumerate(line_items): + if item not in ["Chr", "Locus", "Mb", "cM"]: + geno_start_col = j + break + + sample_list = line_items[geno_start_col:] + if not geno_start_col: + print("Check .geno file - expected columns not found") + sys.exit() + else: # Marker rows + this_marker = { + 'Chr': line_items[header_columns.index("Chr")], + 'Locus': line_items[header_columns.index("Locus")], + 'Mb': line_items[header_columns.index("Mb")], + 'cM': line_items[header_columns.index("cM")], + 'genotypes': [item.strip() for item in line_items][geno_start_col:] + } + + markers.append(this_marker) + + geno_dict['markers'] = markers + + return geno_dict + +if __name__ == "__main__": + main(sys.argv) + |