diff options
Diffstat (limited to 'gn3/db/genotypes.py')
-rw-r--r-- | gn3/db/genotypes.py | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/gn3/db/genotypes.py b/gn3/db/genotypes.py new file mode 100644 index 0000000..8f18cac --- /dev/null +++ b/gn3/db/genotypes.py @@ -0,0 +1,184 @@ +"""Genotype utilities""" + +import os +import gzip +from typing import Union, TextIO + +from gn3.settings import GENOTYPE_FILES + +def build_genotype_file( + geno_name: str, base_dir: str = GENOTYPE_FILES, + extension: str = "geno"): + """Build the absolute path for the genotype file.""" + return "{}/{}.{}".format(os.path.abspath(base_dir), geno_name, extension) + +def load_genotype_samples(genotype_filename: str, file_type: str = "geno"): + """ + Load sample of samples from genotype files. + + DESCRIPTION: + Traits can contain a varied number of samples, some of which do not exist in + certain genotypes. In order to compute QTLs, GEMMAs, etc, we need to ensure + to pick only those samples that exist in the genotype under consideration + for the traits used in the computation. + + This function loads a list of samples from the genotype files for use in + filtering out unusable samples. + + + PARAMETERS: + genotype_filename: The absolute path to the genotype file to load the + samples from. + file_type: The type of file. Currently supported values are 'geno' and + 'plink'. + """ + file_type_fns = { + "geno": __load_genotype_samples_from_geno, + "plink": __load_genotype_samples_from_plink + } + return file_type_fns[file_type](genotype_filename) + +def __load_genotype_samples_from_geno(genotype_filename: str): + """ + Helper function for `load_genotype_samples` function. + + Loads samples from '.geno' files. + """ + gzipped_filename = "{}.gz".format(genotype_filename) + if os.path.isfile(gzipped_filename): + genofile: Union[TextIO, gzip.GzipFile] = gzip.open(gzipped_filename) + else: + genofile = open(genotype_filename) + + for row in genofile: + line = row.strip() + if (not line) or (line.startswith(("#", "@"))): # type: ignore[arg-type] + continue + break + + headers = line.split("\t") # type: ignore[arg-type] + if headers[3] == "Mb": + return headers[4:] + return headers[3:] + +def __load_genotype_samples_from_plink(genotype_filename: str): + """ + Helper function for `load_genotype_samples` function. + + Loads samples from '.plink' files. + """ + genofile = open(genotype_filename) + return [line.split(" ")[1] for line in genofile] + +def parse_genotype_labels(lines: list): + """ + Parse label lines into usable genotype values + + DESCRIPTION: + Reworks + https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/utility/gen_geno_ob.py#L75-L93 + """ + acceptable_labels = ["name", "filler", "type", "mat", "pat", "het", "unk"] + def __parse_label(line): + label, value = [l.strip() for l in line[1:].split(":")] + if label not in acceptable_labels: + return None + if label == "name": + return ("group", value) + return (label, value) + return tuple( + item for item in (__parse_label(line) for line in lines) + if item is not None) + +def parse_genotype_header(line: str, parlist: tuple = tuple()): + """ + Parse the genotype file header line + + DESCRIPTION: + Reworks + https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/utility/gen_geno_ob.py#L94-L114 + """ + items = [item.strip() for item in line.split("\t")] + mbmap = "Mb" in items + prgy = ((parlist + tuple(items[4:])) if mbmap + else (parlist + tuple(items[3:]))) + return ( + ("Mbmap", mbmap), + ("cm_column", items.index("cM")), + ("mb_column", None if not mbmap else items.index("Mb")), + ("prgy", prgy), + ("nprgy", len(prgy))) + +def parse_genotype_marker(line: str, geno_obj: dict, parlist: tuple): + """ + Parse a data line in a genotype file + + DESCRIPTION: + Reworks + https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/utility/gen_geno_ob.py#L143-L190 + """ + # pylint: disable=W0702 + marker_row = [item.strip() for item in line.split("\t")] + geno_table = { + geno_obj["mat"]: -1, geno_obj["pat"]: 1, geno_obj["het"]: 0, + geno_obj["unk"]: "U" + } + start_pos = 4 if geno_obj["Mbmap"] else 3 + if len(parlist) > 0: + start_pos = start_pos + 2 + + alleles = marker_row[start_pos:] + genotype = tuple( + (geno_table[allele] if allele in geno_table.keys() else "U") + for allele in alleles) + if len(parlist) > 0: + genotype = (-1, 1) + genotype + try: + cm_val = float(geno_obj["cm_column"]) + except: + if geno_obj["Mbmap"]: + cm_val = float(geno_obj["mb_column"]) + else: + cm_val = 0 + return ( + ("chr", marker_row[0]), + ("name", marker_row[1]), + ("cM", cm_val), + ("Mb", float(geno_obj["mb_column"]) if geno_obj["Mbmap"] else None), + ("genotype", genotype)) + +def build_genotype_chromosomes(geno_obj, markers): + """ + Build up the chromosomes from the given markers and partially built geno + object + """ + mrks = [dict(marker) for marker in markers] + chr_names = {marker["chr"] for marker in mrks} + return tuple(( + ("name", chr_name), ("mb_exists", geno_obj["Mbmap"]), ("cm_column", 2), + ("mb_column", geno_obj["mb_column"]), + ("loci", tuple(marker for marker in mrks if marker["chr"] == chr_name))) + for chr_name in sorted(chr_names)) + +def parse_genotype_file(filename: str, parlist: tuple = tuple()): + """ + Parse the provided genotype file into a usable pytho3 data structure. + """ + with open(filename, "r") as infile: + contents = infile.readlines() + + lines = tuple(line for line in contents if + ((not line.strip().startswith("#")) and + (not line.strip() == ""))) + labels = parse_genotype_labels( + [line for line in lines if line.startswith("@")]) + data_lines = tuple(line for line in lines if not line.startswith("@")) + header = parse_genotype_header(data_lines[0], parlist) + geno_obj = dict(labels + header) + markers = tuple( + [parse_genotype_marker(line, geno_obj, parlist) + for line in data_lines[1:]]) + chromosomes = tuple( + dict(chromosome) for chromosome in + build_genotype_chromosomes(geno_obj, markers)) + return {**geno_obj, "chromosomes": chromosomes} |