aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2.py
blob: 02217ee0f22239baacc967cf038fda9b75cf6255 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""The R/qtl2 parsing and processing code."""
import io
import csv
import json
from zipfile import ZipFile
from functools import reduce, partial
from typing import Iterator, Iterable, Callable

import yaml

from quality_control.parsing import take

from r_qtl.errors import InvalidFormat

def thread_op(value, *functions):
    """Thread the `value` through the sequence of `functions`."""
    return reduce(lambda result, func: func(result), functions, value)

def control_data(zfile: ZipFile) -> dict:
    """Retrieve the control file from the zip file info."""
    files = tuple(filename
                  for filename in zfile.namelist()
                  if (filename.endswith(".yaml") or filename.endswith(".json")))
    num_files = len(files)
    if num_files == 0:
        raise InvalidFormat("Expected a json or yaml control file.")

    if num_files > 1:
        raise InvalidFormat("Found more than one possible control file.")

    return (json.loads(zfile.read(files[0]))
            if files[0].endswith(".json")
            else yaml.safe_load(zfile.read(files[0])))

def with_non_transposed(zfile: ZipFile,
                        member_key: str,
                        cdata: dict,
                        process_value: Callable[
                            [dict], dict] = lambda val: val) -> Iterator[dict]:
    """Process non-transposed file values

    Arguments:
    zfile: A zipfile object from opening a R/qtl2 bundle.
    member_key: A key to retrieve the member file to process from the file.
    cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file.
    process_value: A function to process the values from the file.
    """
    def not_comment_line(line):
        return not line.startswith(cdata.get("comment.char", "#"))

    with zfile.open(cdata[member_key]) as innerfile:
        reader = csv.DictReader(
            filter(not_comment_line, io.TextIOWrapper(innerfile)),
            delimiter=cdata.get("sep", ","))
        for row in reader:
            yield process_value(row)

def __make_organise_by_id__(id_key):
    """Return a function to use with `reduce` to organise values by some
    identifier."""
    def __organiser__(acc, item):
        row = acc.get(item[id_key], {})
        return {**acc, item[id_key]: {**row, **item}}
    return __organiser__

def __batch_of_n__(iterable: Iterable, num):
    """Return a batch of `num` items or less from the `iterable`."""
    while True:
        items = take(iterable, num)
        if len(items) <= 0:
            break
        yield items

def with_transposed(zfile: ZipFile,
                    member_key: str,
                    cdata: dict,
                    process_value: Callable[
                        [str, tuple[str, ...], tuple[str, ...]],
                        tuple[dict, ...]]) -> Iterator[dict]:
    """Process transposed file values

    Arguments:
    zfile: A zipfile object from opening a R/qtl2 bundle.
    member_key: A key to retrieve the member file to process from the file.
    cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file.
    process_value: A function to process the values from the file.
    """
    with zfile.open(cdata[member_key]) as innerfile:
        lines = (tuple(field.strip() for field in
                       line.strip().split(cdata.get("sep", ",")))
                 for line in
                 filter(lambda line: not line.startswith("#"),
                        io.TextIOWrapper(innerfile)))
        try:
            id_line = next(lines)
            id_key, headers = id_line[0], id_line[1:]
            for _key, row in reduce(# type: ignore[var-annotated]
                    __make_organise_by_id__(id_key),
                    (row
                     for batch in __batch_of_n__(lines, 300)
                     for line in batch
                     for row in process_value(id_key, headers, line)),
                    {}).items():
                yield row
        except StopIteration:
            pass

def make_process_data_geno(cdata) -> tuple[
        Callable[[dict], dict],
        Callable[[str, tuple[str, ...], tuple[str, ...]],
                 tuple[dict, ...]]]:
    """Build functions to process genotype data."""
    def replace_genotype_codes(val):
        return cdata["genotypes"].get(val, val)

    def replace_na_strings(val):
        nastrings = cdata.get("na.strings")
        if bool(nastrings):
            return (None if val in nastrings else val)
        return val
    def __non_transposed__(row: dict) -> dict:
        return {
            key: thread_op(value, replace_genotype_codes, replace_na_strings)
            for key,value in row.items()
        }
    def __transposed__(id_key: str,
                       ids: tuple[str, ...],
                       vals: tuple[str, ...]) -> tuple[dict, ...]:
        return tuple(
            dict(zip(
                [id_key, vals[0]],
                (thread_op(item, replace_genotype_codes, replace_na_strings)
                 for item in items)))
            for items in zip(ids, vals[1:]))
    return (__non_transposed__, __transposed__)

def replace_cross_info(val, cdata: dict):
    """
    Replace cross information in files with the values in the control data.
    """
    cross_info = cdata.get("cross_info", False)
    if bool(cross_info):
        return cross_info.get(val, val)
    return val

def make_process_data_covar(cdata) -> tuple[
        Callable[[dict], dict],
        Callable[[str, tuple[str, ...], tuple[str, ...]],
                 tuple[dict, ...]]]:
    """Build functions to process sex and cross information in covar files."""
    def replace_sex_code(val):
        sex_info = cdata.get("sex", False)
        if bool(sex_info):
            return sex_info.get(val, val)
        return val
    rep_cross_info = partial(replace_cross_info, cdata=cdata)
    def non_transposed(row: dict) -> dict:
        return {
            key: thread_op(value, replace_sex_code, rep_cross_info)
            for key,value in row.items()
        }
    def transposed(id_key: str,
                   ids: tuple[str, ...],
                   vals: tuple[str, ...]) -> tuple[dict, ...]:
        return tuple(
            dict(zip(
                [id_key, vals[0]],
                (thread_op(item, replace_sex_code, rep_cross_info)
                 for item in items)))
            for items in zip(ids, vals[1:]))
    return (non_transposed, transposed)

def __default_process_value_transposed__(
        id_key: str,
        ids: tuple[str, ...],
        vals: tuple[str, ...]) -> tuple[dict, ...]:
    """Default values processor for transposed files."""
    return tuple(
        dict(zip([id_key, vals[0]], items)) for items in zip(ids, vals[1:]))

def file_data(zfile: ZipFile,
              member_key: str,
              cdata: dict,
              process_value: Callable[[dict], dict] = lambda val: val,
              process_transposed_value: Callable[
                  [str, tuple[str, ...], tuple[str, ...]],
                  tuple[dict, ...]] = __default_process_value_transposed__) -> Iterator[dict]:
    """Load data from files in R/qtl2 zip bundle."""
    if isinstance(cdata[member_key], list):
        for row in (line for lines in
                    (file_data(
                        zfile, member_key, {**cdata, member_key: innerfile},
                        process_value, process_transposed_value)
                     for innerfile in cdata[member_key])
                    for line in lines):
            yield row
        return
    if not cdata.get(f"{member_key}_transposed", False):
        for row in with_non_transposed(zfile, member_key, cdata, process_value):
            yield row
        return

    for row in with_transposed(
            zfile, member_key, cdata, process_transposed_value):
        yield row

def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
    """Load cross information where present."""
    cdata_cross_info = cdata.get("cross_info", {})
    cross_info_file_key = "covar"
    new_cdata = {**cdata}
    sex_fields = (cdata.get("sex",{}).get("covar",""),)
    if "file" in cdata_cross_info:
        cross_info_file_key = "gnqc_cross_info_file"
        new_cdata = {**cdata, "gnqc_cross_info_file": cdata_cross_info["file"]}

    for row in file_data(zfile,
                         cross_info_file_key,
                         new_cdata,
                         *make_process_data_covar(cdata)):
        yield {
            key: thread_op(value, partial(replace_cross_info, cdata=cdata))
            for key, value in row.items() if key not in sex_fields}