aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--r_qtl/r_qtl2.py40
1 files changed, 39 insertions, 1 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py
index 9c9d67b..508d3eb 100644
--- a/r_qtl/r_qtl2.py
+++ b/r_qtl/r_qtl2.py
@@ -1,13 +1,20 @@
"""The R/qtl2 parsing and processing code."""
+import io
+import csv
import json
import yaml
from pathlib import Path
-from typing import List, Union
+from functools import reduce
+from typing import Any, List, Union, Iterator
from zipfile import ZipFile, ZipInfo, is_zipfile
from quality_control.debug import __pk__
from r_qtl.errors import InvalidFormat
+def thread_op(value, *functions):
+ """Thread the `value` through the sequence of `functions`."""
+ return reduce(lambda result, func: func(result), functions, value)
+
def control_data(zfile: ZipFile) -> dict:
"""Retrieve the control file from the zip file info."""
files = tuple(filename
@@ -24,6 +31,37 @@ def control_data(zfile: ZipFile) -> dict:
if files[0].endswith(".json")
else yaml.safe_load(zfile.read(files[0])))
+def genotype_metadata(zfile: ZipFile, cdata: dict) -> dict:
+ """Read Individual ID key and the marker names."""
+ # TODO: Handle transposed files
+ line_num = 0
+ with zfile.open(cdata["geno"]) as genofile:
+ for line in filter(lambda line: not line.startswith("#"),
+ io.TextIOWrapper(genofile)):
+ line_parts = line.strip().split(cdata.get("sep", ","))
+ return {
+ "individual_id_key": line_parts[0].strip(),
+ "markers": tuple(marker.strip() for marker in line_parts[1:])
+ }
+
+def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
+ """Load the genotype file, making use of the control data."""
+ # TODO: Handle transposed files
+ with zfile.open(cdata["geno"]) as genofile:
+ reader = csv.DictReader(filter(lambda line: not line.startswith("#"),
+ io.TextIOWrapper(genofile)),
+ delimiter=cdata.get("sep", ","))
+ for row in reader:
+ yield {
+ key: thread_op(
+ value,
+ # replace genotype codes
+ lambda val: cdata["genotypes"].get(val, val),
+ # replace N/A strings
+ lambda val: (None if val in cdata["na.strings"] else val))
+ for key,value
+ in row.items()
+ }
def read_r_qtl2_files(filepath: Path):
"""Read R/qtl2 format zip files."""