aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2.py
blob: 16bb65257e0d4a94949371b1b1c44da5b509f278 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""The R/qtl2 parsing and processing code."""
import io
import csv
import json
from zipfile import ZipFile
from functools import reduce
from typing import Iterator, Iterable, Callable

import yaml

from quality_control.parsing import take

from r_qtl.errors import InvalidFormat

def thread_op(value, *functions):
    """Thread the `value` through the sequence of `functions`."""
    return reduce(lambda result, func: func(result), functions, value)

def control_data(zfile: ZipFile) -> dict:
    """Retrieve the control file from the zip file info."""
    files = tuple(filename
                  for filename in zfile.namelist()
                  if (filename.endswith(".yaml") or filename.endswith(".json")))
    num_files = len(files)
    if num_files == 0:
        raise InvalidFormat("Expected a json or yaml control file.")

    if num_files > 1:
        raise InvalidFormat("Found more than one possible control file.")

    return (json.loads(zfile.read(files[0]))
            if files[0].endswith(".json")
            else yaml.safe_load(zfile.read(files[0])))

def with_non_transposed(zfile: ZipFile,
                        member_key: str,
                        cdata: dict,
                        func: Callable[[dict], dict] = lambda val: val) -> Iterator[dict]:
    """Abstracts away common file-opening for non-transposed R/qtl2 files."""
    def not_comment_line(line):
        return not line.startswith(cdata.get("comment.char", "#"))

    with zfile.open(cdata[member_key]) as innerfile:
        reader = csv.DictReader(
            filter(not_comment_line, io.TextIOWrapper(innerfile)),
            delimiter=cdata.get("sep", ","))
        for row in reader:
            yield func(row)

def __make_organise_by_id__(id_key):
    """Return a function to use with `reduce` to organise values by some
    identifier."""
    def __organiser__(acc, item):
        row = acc.get(item[id_key], {})
        return {**acc, item[id_key]: {**row, **item}}
    return __organiser__

def __batch_of_n__(iterable: Iterable, num):
    """Return a batch of `num` items or less from the `iterable`."""
    while True:
        items = take(iterable, num)
        if len(items) <= 0:
            break
        yield items

def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
    """Load the genotype file, making use of the control data."""
    def replace_genotype_codes(val):
        return cdata["genotypes"].get(val, val)

    def replace_na_strings(val):
        nastrings = cdata.get("na.strings")
        if bool(nastrings):
            return (None if val in nastrings else val)
        return val

    if not cdata.get("geno_transposed", False):
        for line in with_non_transposed(
                zfile,
                "geno",
                cdata,
                lambda row: {
                    key: thread_op(value, replace_genotype_codes, replace_na_strings)
                    for key,value in row.items()
                }):
            yield line

    def __merge__(key, samples, line):
        marker = line[0]
        return tuple(
            dict(zip(
                [key, marker],
                (thread_op(item, replace_genotype_codes, replace_na_strings)
                 for item in items)))
            for items in zip(samples, line[1:]))

    if cdata.get("geno_transposed", False):
        with zfile.open(cdata["geno"]) as genofile:
            lines = (line.strip().split(cdata.get("sep", ","))
                     for line in filter(lambda line: not line.startswith("#"),
                                         io.TextIOWrapper(genofile)))
            try:
                id_line = next(lines)
                id_key, samples = id_line[0], id_line[1:]
                for _key, row in reduce(# type: ignore[var-annotated]
                        __make_organise_by_id__(id_key),
                        (row
                         for batch in __batch_of_n__(lines, 300)
                         for line in batch
                         for row in __merge__(id_key, samples, line)),
                        {}).items():
                    yield row
            except StopIteration:
                return None

def map_data(zfile: ZipFile, map_type: str, cdata: dict) -> tuple[dict, ...]:
    """Read gmap files to get the genome mapping data"""
    assert map_type in ("genetic-map", "physical-map"), "Invalid map type"
    map_file_key = {
        "genetic-map": "gmap",
        "physical-map": "pmap"
    }[map_type]
    transposed_dict = {
        "genetic-map": "gmap_transposed",
        "physical-map": "pmap_transposed"
    }
    if not cdata.get(transposed_dict[map_type], False):
        return tuple(with_non_transposed(zfile, map_file_key, cdata))

    with zfile.open(cdata[map_file_key]) as gmapfile:
        lines = [[field.strip() for field in
                  line.strip().split(cdata.get("sep", ","))]
                 for line in
                 filter(lambda line: not line.startswith("#"),
                        io.TextIOWrapper(gmapfile))]

    headers = tuple(line[0] for line in lines)
    return reduce(
        lambda gmap, row: gmap + (dict(zip(headers, row)),),
        zip(*(line[1:] for line in lines)),
        tuple())