about summary refs log tree commit diff
path: root/gn3/db/genotypes.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/db/genotypes.py')
-rw-r--r--gn3/db/genotypes.py184
1 files changed, 184 insertions, 0 deletions
diff --git a/gn3/db/genotypes.py b/gn3/db/genotypes.py
new file mode 100644
index 0000000..8f18cac
--- /dev/null
+++ b/gn3/db/genotypes.py
@@ -0,0 +1,184 @@
+"""Genotype utilities"""
+
+import os
+import gzip
+from typing import Union, TextIO
+
+from gn3.settings import GENOTYPE_FILES
+
+def build_genotype_file(
+        geno_name: str, base_dir: str = GENOTYPE_FILES,
+        extension: str = "geno"):
+    """Build the absolute path for the genotype file."""
+    return "{}/{}.{}".format(os.path.abspath(base_dir), geno_name, extension)
+
+def load_genotype_samples(genotype_filename: str, file_type: str = "geno"):
+    """
+    Load sample of samples from genotype files.
+
+    DESCRIPTION:
+    Traits can contain a varied number of samples, some of which do not exist in
+    certain genotypes. In order to compute QTLs, GEMMAs, etc, we need to ensure
+    to pick only those samples that exist in the genotype under consideration
+    for the traits used in the computation.
+
+    This function loads a list of samples from the genotype files for use in
+    filtering out unusable samples.
+
+
+    PARAMETERS:
+    genotype_filename: The absolute path to the genotype file to load the
+        samples from.
+    file_type: The type of file. Currently supported values are 'geno' and
+        'plink'.
+    """
+    file_type_fns = {
+        "geno": __load_genotype_samples_from_geno,
+        "plink": __load_genotype_samples_from_plink
+    }
+    return file_type_fns[file_type](genotype_filename)
+
+def __load_genotype_samples_from_geno(genotype_filename: str):
+    """
+    Helper function for `load_genotype_samples` function.
+
+    Loads samples from '.geno' files.
+    """
+    gzipped_filename = "{}.gz".format(genotype_filename)
+    if os.path.isfile(gzipped_filename):
+        genofile: Union[TextIO, gzip.GzipFile] = gzip.open(gzipped_filename)
+    else:
+        genofile = open(genotype_filename)
+
+    for row in genofile:
+        line = row.strip()
+        if (not line) or (line.startswith(("#", "@"))): # type: ignore[arg-type]
+            continue
+        break
+
+    headers = line.split("\t") # type: ignore[arg-type]
+    if headers[3] == "Mb":
+        return headers[4:]
+    return headers[3:]
+
+def __load_genotype_samples_from_plink(genotype_filename: str):
+    """
+    Helper function for `load_genotype_samples` function.
+
+    Loads samples from '.plink' files.
+    """
+    genofile = open(genotype_filename)
+    return [line.split(" ")[1] for line in genofile]
+
+def parse_genotype_labels(lines: list):
+    """
+    Parse label lines into usable genotype values
+
+    DESCRIPTION:
+    Reworks
+    https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/utility/gen_geno_ob.py#L75-L93
+    """
+    acceptable_labels = ["name", "filler", "type", "mat", "pat", "het", "unk"]
+    def __parse_label(line):
+        label, value = [l.strip() for l in line[1:].split(":")]
+        if label not in acceptable_labels:
+            return None
+        if label == "name":
+            return ("group", value)
+        return (label, value)
+    return tuple(
+        item for item in (__parse_label(line) for line in lines)
+        if item is not None)
+
+def parse_genotype_header(line: str, parlist: tuple = tuple()):
+    """
+    Parse the genotype file header line
+
+    DESCRIPTION:
+    Reworks
+    https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/utility/gen_geno_ob.py#L94-L114
+    """
+    items = [item.strip() for item in line.split("\t")]
+    mbmap = "Mb" in items
+    prgy = ((parlist + tuple(items[4:])) if mbmap
+            else (parlist + tuple(items[3:])))
+    return (
+        ("Mbmap", mbmap),
+        ("cm_column", items.index("cM")),
+        ("mb_column", None if not mbmap else items.index("Mb")),
+        ("prgy", prgy),
+        ("nprgy", len(prgy)))
+
+def parse_genotype_marker(line: str, geno_obj: dict, parlist: tuple):
+    """
+    Parse a data line in a genotype file
+
+    DESCRIPTION:
+    Reworks
+    https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/utility/gen_geno_ob.py#L143-L190
+    """
+    # pylint: disable=W0702
+    marker_row = [item.strip() for item in line.split("\t")]
+    geno_table = {
+        geno_obj["mat"]: -1, geno_obj["pat"]: 1, geno_obj["het"]: 0,
+        geno_obj["unk"]: "U"
+    }
+    start_pos = 4 if geno_obj["Mbmap"] else 3
+    if len(parlist) > 0:
+        start_pos = start_pos + 2
+
+    alleles = marker_row[start_pos:]
+    genotype = tuple(
+        (geno_table[allele] if allele in geno_table.keys() else "U")
+        for allele in alleles)
+    if len(parlist) > 0:
+        genotype = (-1, 1) + genotype
+    try:
+        cm_val = float(geno_obj["cm_column"])
+    except:
+        if geno_obj["Mbmap"]:
+            cm_val = float(geno_obj["mb_column"])
+        else:
+            cm_val = 0
+    return (
+        ("chr", marker_row[0]),
+        ("name", marker_row[1]),
+        ("cM", cm_val),
+        ("Mb", float(geno_obj["mb_column"]) if geno_obj["Mbmap"] else None),
+        ("genotype", genotype))
+
+def build_genotype_chromosomes(geno_obj, markers):
+    """
+    Build up the chromosomes from the given markers and partially built geno
+    object
+    """
+    mrks = [dict(marker) for marker in markers]
+    chr_names = {marker["chr"] for marker in mrks}
+    return tuple((
+        ("name", chr_name), ("mb_exists", geno_obj["Mbmap"]), ("cm_column", 2),
+        ("mb_column", geno_obj["mb_column"]),
+        ("loci", tuple(marker for marker in mrks if marker["chr"] == chr_name)))
+                 for chr_name in sorted(chr_names))
+
+def parse_genotype_file(filename: str, parlist: tuple = tuple()):
+    """
+    Parse the provided genotype file into a usable pytho3 data structure.
+    """
+    with open(filename, "r") as infile:
+        contents = infile.readlines()
+
+    lines = tuple(line for line in contents if
+                  ((not line.strip().startswith("#")) and
+                   (not line.strip() == "")))
+    labels = parse_genotype_labels(
+        [line for line in lines if line.startswith("@")])
+    data_lines = tuple(line for line in lines if not line.startswith("@"))
+    header = parse_genotype_header(data_lines[0], parlist)
+    geno_obj = dict(labels + header)
+    markers = tuple(
+        [parse_genotype_marker(line, geno_obj, parlist)
+         for line in data_lines[1:]])
+    chromosomes = tuple(
+        dict(chromosome) for chromosome in
+        build_genotype_chromosomes(geno_obj, markers))
+    return {**geno_obj, "chromosomes": chromosomes}