about summary refs log tree commit diff
path: root/wqflask/maintenance/gen_ind_genofiles.py
diff options
context:
space:
mode:
authorArun Isaac2023-12-29 18:55:37 +0000
committerArun Isaac2023-12-29 19:01:46 +0000
commit204a308be0f741726b9a620d88fbc22b22124c81 (patch)
treeb3cf66906674020b530c844c2bb4982c8a0e2d39 /wqflask/maintenance/gen_ind_genofiles.py
parent83062c75442160427b50420161bfcae2c5c34c84 (diff)
downloadgenenetwork2-204a308be0f741726b9a620d88fbc22b22124c81.tar.gz
Namespace all modules under gn2.
We move all modules under a gn2 directory. This is important for
"correct" packaging and deployment as a Guix service.
Diffstat (limited to 'wqflask/maintenance/gen_ind_genofiles.py')
-rw-r--r--wqflask/maintenance/gen_ind_genofiles.py253
1 files changed, 0 insertions, 253 deletions
diff --git a/wqflask/maintenance/gen_ind_genofiles.py b/wqflask/maintenance/gen_ind_genofiles.py
deleted file mode 100644
index b755c648..00000000
--- a/wqflask/maintenance/gen_ind_genofiles.py
+++ /dev/null
@@ -1,253 +0,0 @@
-#!/usr/bin/env python3
-"""A script that generates the genotype files for groups of individuals, using an existing strain genotype file as a basis
-
-Example commands:
-python3 gen_ind_genofiles.py
-        /home/zas1024/gn2-zach/genotype_files/genotype/
-        /home/zas1024/gn2-zach/new_geno/
-        BXD-Micturition.geno
-        BXD.json
-python3 gen_ind_genofiles.py
-        /home/zas1024/gn2-zach/genotype_files/genotype
-        /home/zas1024/gn2-zach/new_geno/
-        BXD-Micturition.geno
-        BXD.2.geno BXD.4.geno BXD.5.geno
-
-"""
-
-import json
-import os
-import sys
-from typing import List
-
-import MySQLdb
-
-def conn():
-    return MySQLdb.Connect(db=os.environ.get("DB_NAME"),
-                           user=os.environ.get("DB_USER"),
-                           passwd=os.environ.get("DB_PASS"),
-                           host=os.environ.get("DB_HOST"))
-
-def main(args):
-
-    # Directory in which .geno files are located
-    geno_dir = args[1]
-
-    # Directory in which to output new files
-    out_dir = args[2]
-
-    # The individuals group that we want to generate a .geno file for
-    target_file = geno_dir + args[3]
-
-    # The source group(s) we're generating the .geno files from
-    # This can be passed as either a specific .geno file (or set of files as multiple arguments),
-    # or as a JSON file containing a set of .geno files (and their corresponding file names and sample lists)
-    geno_json = {}
-    source_files = []
-    if ".json" in args[4]:
-        geno_json = json.load(open(geno_dir + args[4], "r"))
-        par_f1s = {
-            "mat": geno_json['mat'],
-            "pat": geno_json['pat'],
-            "f1s": geno_json['f1s']
-        }
-
-        # List of file titles and locations from JSON
-        source_files = [{'title': genofile['title'], 'location': geno_dir + genofile['location']} for genofile in geno_json['genofile']]
-    else:
-        par_f1s = {}
-        # List of files directly taken from command line arguments, with titles just set to the filename
-        for group in args[4:]:
-            file_name = geno_dir + group + ".geno" if ".geno" not in group else geno_dir + group
-            source_files.append({'title': file_name[:-5], 'location': file_name})
-
-    if len(source_files) > 1:
-        # Generate a JSON file pointing to the new target genotype files, in situations where there are multiple source .geno files
-        target_json_loc = out_dir + ".".join(args[3].split(".")[:-1]) + ".json"
-        target_json = {'genofile': []}
-
-        # Generate the output .geno files
-        for source_file in source_files:
-            filename, samples = generate_new_genofile(source_file['location'], target_file, par_f1s, out_dir)
-
-            target_json['genofile'].append({
-                'location': filename.split("/")[-1],
-                'title': source_file['title'],
-                'sample_list': samples
-            })
-
-        json.dump(target_json, open(target_json_loc, "w"))
-    else:
-        filename, samples = generate_new_genofile(source_files[0]['location'], target_file, par_f1s, out_dir)
-
-def get_strain_for_sample(sample):
-    query = (
-        "SELECT CaseAttributeXRefNew.Value "
-        "FROM CaseAttributeXRefNew, Strain "
-        "WHERE CaseAttributeXRefNew.CaseAttributeId=11 "
-        "AND CaseAttributeXRefNew.StrainId = Strain.Id "
-        "AND Strain.Name = %(name)s" )
-
-    with conn().cursor() as cursor:
-        cursor.execute(query, {"name": sample.strip()})
-        strain = cursor.fetchone()[0]
-        return strain
-
-def generate_new_genofile(source_genofile, target_genofile, par_f1s, out_dir):
-    source_samples = group_samples(source_genofile)
-    source_genotypes = strain_genotypes(source_genofile)
-    target_samples = group_samples(target_genofile)
-    strain_pos_map = map_strain_pos_to_target_group(source_samples, target_samples, par_f1s)
-
-    if len(source_genofile.split("/")[-1].split(".")) > 2:
-        # The number in the source genofile; for example 4 in BXD.4.geno
-        source_num = source_genofile.split("/")[-1].split(".")[-2]
-        target_filename = ".".join(target_genofile.split("/")[-1].split(".")[:-1]) + "." + source_num + ".geno"
-    else:
-        target_filename = ".".join(target_genofile.split("/")[-1].split(".")[:-1]) + ".geno"
-
-    file_location = out_dir + target_filename
-
-    with open(file_location, "w") as fh:
-        for metadata in ["name", "type", "mat", "pat", "het", "unk"]:
-            fh.write("@" + metadata + ":" + source_genotypes[metadata] + "\n")
-
-        header_line = ["Chr", "Locus", "cM", "Mb"] + target_samples
-        fh.write("\t".join(header_line) + "\n")
-
-        for marker in source_genotypes['markers']:
-            line_items = [
-                marker['Chr'],
-                marker['Locus'],
-                marker['cM'],
-                marker['Mb']
-            ]
-
-            for pos in strain_pos_map:
-                if isinstance(pos, int):
-                    line_items.append(marker['genotypes'][pos])
-                else:
-                    if pos in ["mat", "pat"]:
-                        line_items.append(source_genotypes[pos])
-                    elif pos == "f1s":
-                        line_items.append("H")
-                    else:
-                        line_items.append("U")
-
-            fh.write("\t".join(line_items) + "\n")
-
-    return file_location, target_samples
-
-def map_strain_pos_to_target_group(source_samples, target_samples, par_f1s):
-    """
-    Retrieve corresponding strain position for each sample in the target group
-
-    This is so the genotypes from the base genofile can be mapped to the samples in the target group
-
-    For example:
-    Base strains: BXD1, BXD2, BXD3
-    Target samples: BXD1_1, BXD1_2, BXD2_1, BXD3_1, BXD3_2, BXD3_3
-    Returns: [0, 0, 1, 2, 2, 2]
-    """
-    pos_map = []
-    for sample in target_samples:
-        sample_strain = get_strain_for_sample(sample)
-        if sample_strain in source_samples:
-            pos_map.append(source_samples.index(sample_strain))
-        else:
-            val = "U"
-            for key in par_f1s.keys():
-                if sample_strain in par_f1s[key]:
-                    val = key
-            pos_map.append(val)
-
-    return pos_map
-
-def group_samples(target_file: str) -> List:
-    """
-    Get the group samples from its "dummy" .geno file (which still contains the sample list)
-    """
-
-    sample_list = []
-    with open(target_file, "r") as target_geno:
-        for i, line in enumerate(target_geno):
-            # Skip header lines
-            if line[0] in ["#", "@"] or not len(line):
-                continue
-    
-            line_items = line.split()
-
-            sample_list = [item for item in line_items if item not in ["Chr", "Locus", "Mb", "cM"]]
-            break
-
-    return sample_list
-
-def strain_genotypes(strain_genofile: str) -> List:
-    """
-    Read genotypes from source strain .geno file
-
-    :param strain_genofile: string of genofile filename
-    :return: a list of dictionaries representing each marker's genotypes
-
-    Example output: [
-        {
-            'Chr': '1',
-            'Locus': 'marker1',
-            'Mb': '10.0',
-            'cM': '8.0',
-            'genotypes': [('BXD1', 'B'), ('BXD2', 'D'), ('BXD3', 'H'), ...]
-        },
-        ...
-    ]
-    """
-
-    geno_dict = {}
-
-    geno_start_col = None
-    header_columns = []
-    sample_list = []
-    markers = []
-    with open(strain_genofile, "r") as source_geno:
-        for i, line in enumerate(source_geno):
-            if line[0] == "@":
-                metadata_type = line[1:].split(":")[0]
-                if metadata_type in ['name', 'type', 'mat', 'pat', 'het', 'unk']:
-                    geno_dict[metadata_type] = line.split(":")[1].strip()
-
-                continue
-
-            # Skip other header lines
-            if line[0] == "#" or not len(line):
-                continue
-
-            line_items = line.split("\t")
-            if "Chr" in line_items: # Header row
-                # Get the first column index containing genotypes
-                header_columns = line_items
-                for j, item in enumerate(line_items):
-                    if item not in ["Chr", "Locus", "Mb", "cM"]:
-                        geno_start_col = j
-                        break
-
-                sample_list = line_items[geno_start_col:]
-                if not geno_start_col:
-                    print("Check .geno file - expected columns not found")
-                    sys.exit()
-            else: # Marker rows
-                this_marker = {
-                    'Chr': line_items[header_columns.index("Chr")],
-                    'Locus': line_items[header_columns.index("Locus")],
-                    'Mb': line_items[header_columns.index("Mb")],
-                    'cM': line_items[header_columns.index("cM")],
-                    'genotypes': [item.strip() for item in line_items][geno_start_col:]
-                }
-
-                markers.append(this_marker)
-
-    geno_dict['markers'] = markers
-
-    return geno_dict
-            
-if __name__ == "__main__":
-    main(sys.argv)
-