From 95d2b868adebbc7ebbc2435f9184c30c014ec513 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 3 Jan 2024 16:41:33 +0300 Subject: Parse founder_geno files. Generalise parsing files. * Add tests for parsing "founder_geno" files * Extract common file parsing structure out to more general function * Use generic function to parse "founder_geno" file in test --- r_qtl/r_qtl2.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) (limited to 'r_qtl') diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py index 2256609..4dac24b 100644 --- a/r_qtl/r_qtl2.py +++ b/r_qtl/r_qtl2.py @@ -105,6 +105,35 @@ def with_transposed(zfile: ZipFile, except StopIteration: pass +def make_process_data_geno(cdata) -> tuple[ + Callable[[dict], dict], + Callable[[str, tuple[str, ...], tuple[str, ...]], + tuple[dict, ...]]]: + """Build functions to process genotype data.""" + def replace_genotype_codes(val): + return cdata["genotypes"].get(val, val) + + def replace_na_strings(val): + nastrings = cdata.get("na.strings") + if bool(nastrings): + return (None if val in nastrings else val) + return val + def __non_transposed__(row: dict) -> dict: + return { + key: thread_op(value, replace_genotype_codes, replace_na_strings) + for key,value in row.items() + } + def __transposed__(id_key: str, + ids: tuple[str, ...], + vals: tuple[str, ...]) -> tuple[dict, ...]: + return tuple( + dict(zip( + [id_key, vals[0]], + (thread_op(item, replace_genotype_codes, replace_na_strings) + for item in items))) + for items in zip(ids, vals[1:])) + return (__non_transposed__, __transposed__) + def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load the genotype file, making use of the control data.""" def replace_genotype_codes(val): @@ -176,3 +205,28 @@ def phenotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: for items in zip(ids, vals[1:])) for row in with_transposed(zfile, "pheno", cdata, __merge__): yield row + +def __default_process_value_transposed__( + id_key: str, + ids: tuple[str, ...], + vals: tuple[str, ...]) -> tuple[dict, ...]: + """Default values processor for transposed files.""" + return tuple( + dict(zip([id_key, vals[0]], items)) for items in zip(ids, vals[1:])) + +def file_data(zfile: ZipFile, + member_key: str, + cdata: dict, + process_value: Callable[[dict], dict] = lambda val: val, + process_transposed_value: Callable[ + [str, tuple[str, ...], tuple[str, ...]], + tuple[dict, ...]] = __default_process_value_transposed__) -> Iterator[dict]: + """Load data from files in R/qtl2 zip bundle.""" + if not cdata.get(f"{member_key}_transposed", False): + for row in with_non_transposed(zfile, member_key, cdata, process_value): + yield row + return + + for row in with_transposed( + zfile, member_key, cdata, process_transposed_value): + yield row -- cgit v1.2.3