about summary refs log tree commit diff
path: root/gn3/computations/rqtl2.py
blob: 3676194fe31630e0249f8121665adf484b11b94d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Module contains functions to parse and process rqtl2 input and output"""
import os
import csv
import uuid
import json
from pathlib import Path
from typing import List
from typing import Dict
from typing import Any

def generate_rqtl2_files(data, workspace_dir):
    """Prepare data  and generate necessary CSV  files
    required to write to control_file
    """
    file_to_name_map = {
        "geno_file": "geno_data",
        "pheno_file": "pheno_data",
        "geno_map_file": "geno_map_data",
        "physical_map_file": "physical_map_data",
        "phenocovar_file": "phenocovar_data",
    }
    parsed_files = {}
    for file_name, data_key in file_to_name_map.items():
        if data_key in data:
            file_path = write_to_csv(
                workspace_dir, f"{file_name}.csv", data[data_key])
            if file_path:
                parsed_files[file_name] = file_path
    return {**data, **parsed_files}


def write_to_csv(work_dir, file_name, data: list[dict],
                 headers=None, delimiter=","):
    """Functions to write data list  to csv file
    if headers is not provided use the keys for first boject.
    """
    if not data:
        return ""
    if headers is None:
        headers = data[0].keys()
    file_path = os.path.join(work_dir, file_name)
    with open(file_path, "w", encoding="utf-8") as file_handler:
        writer = csv.DictWriter(file_handler, fieldnames=headers,
                                delimiter=delimiter)
        writer.writeheader()
        for row in data:
            writer.writerow(row)
        # return the relative file to the workspace see rqtl2 docs
        return file_name


def validate_required_keys(required_keys: list, data: dict) -> tuple[bool, str]:
    """Check for missing keys in data object"""
    missing_keys = [key for key in required_keys if key not in data]
    if missing_keys:
        return False, f"Required key(s) missing: {', '.join(missing_keys)}"
    return True, ""


def compose_rqtl2_cmd(# pylint: disable=[too-many-positional-arguments]
        rqtl_path, input_file, output_file, workspace_dir, data, config):
    """Compose the command for running the R/QTL2 analysis."""
    # pylint: disable=R0913
    params = {
        "input_file": input_file,
        "directory": workspace_dir,
        "output_file": output_file,
        "nperm": data.get("nperm", 0),
        "method": data.get("method", "HK"),
        "threshold": data.get("threshold", 1),
        "cores": config.get('MULTIPROCESSOR_PROCS', 1)
    }
    rscript_path = config.get("RSCRIPT", os.environ.get("RSCRIPT", "Rscript"))
    return f"{rscript_path} { rqtl_path } " + " ".join(
        [f"--{key} {val}" for key, val in params.items()])


def create_file(file_path):
    """Utility function to create file given a file_path"""
    try:
        with open(file_path, "x", encoding="utf-8") as _file_handler:
            return True, f"File created at {file_path}"
    except FileExistsError:
        return False, "File Already Exists"


def prepare_files(tmpdir):
    """Prepare necessary files and workspace dir  for computation."""
    workspace_dir = os.path.join(tmpdir, str(uuid.uuid4()))
    Path(workspace_dir).mkdir(parents=False, exist_ok=True)
    input_file = os.path.join(
        workspace_dir, f"rqtl2-input-{uuid.uuid4()}.json")
    output_file = os.path.join(
        workspace_dir, f"rqtl2-output-{uuid.uuid4()}.json")

    # to ensure streaming api has access to file  even after computation ends
    # .. Create the log file outside the workspace_dir
    log_file = os.path.join(tmpdir, f"rqtl2-log-{uuid.uuid4()}")
    for file_path in [input_file, output_file, log_file]:
        create_file(file_path)
    return workspace_dir, input_file, output_file, log_file


def write_input_file(input_file, workspace_dir, data):
    """
    Write input data to a json file to be passed
    as input to the rqtl2 script
    """
    with open(input_file, "w+", encoding="UTF-8") as file_handler:
        # todo choose a better variable name
        rqtl2_files = generate_rqtl2_files(data, workspace_dir)
        json.dump(rqtl2_files, file_handler)


def read_output_file(output_path: str) -> dict:
    """function to read output file json generated from rqtl2
    see rqtl2_wrapper.R script for the expected output
    """
    with open(output_path, "r", encoding="utf-8") as file_handler:
        results = json.load(file_handler)
        return results


def process_permutation(data):
    """ This function processses output data from the output results.
    input: data object  extracted from the output_file
    returns:
        dict: A dict containing
            * phenotypes array
            * permutations as dict with keys as permutation_id
            * significance_results with keys as threshold values
    """

    perm_file = data.get("permutation_file")
    with open(perm_file, "r", encoding="utf-8") as file_handler:
        reader = csv.reader(file_handler)
        phenotypes = next(reader)[1:]
        perm_results = {_id: float(val) for _id, val, *_ in reader}
        _, significance = fetch_significance_results(data.get("significance_file"))
        return {
        "phenotypes": phenotypes,
        "perm_results": perm_results,
        "significance": significance,
    }


def fetch_significance_results(file_path: str):
    """
    Processes the 'significance_file' from the given data object to extract
    phenotypes and significance values.
    thresholds  values are: (0.05, 0.01)
    Args:
        file_path (str): file_Path for the significance output

    Returns:
        tuple: A tuple containing
            *  phenotypes (list): List of phenotypes
            *  significances (dict): A dictionary where keys
               ...are threshold values and values are lists
               of significant results corresponding to each threshold.
    """
    with open(file_path, "r", encoding="utf-8") as file_handler:
        reader = csv.reader(file_handler)
        results = {}
        phenotypes = next(reader)[1:]
        for line in reader:
            threshold, significance = line[0], line[1:]
            results[threshold] = significance
        return (phenotypes, results)


def process_scan_results(qtl_file_path: str, map_file_path: str) -> List[Dict[str, Any]]:
    """Function to process genome scanning results and obtain marker_name, Lod score,
    marker_position, and chromosome.
    Args:
        qtl_file_path (str): Path to the QTL scan results CSV file.
        map_file_path (str): Path to the map file from the script.

    Returns:
        List[Dict[str, str]]: A list of dictionaries containing the marker data.
    """
    map_data = {}
    # read the genetic map
    with open(map_file_path, "r", encoding="utf-8") as file_handler:
        reader = csv.reader(file_handler)
        next(reader)
        for line in reader:
            marker, chr_, cm_, mb_ = line
            cm: float | None = float(cm_) if cm_ and cm_ != "NA" else None
            mb: float | None = float(mb_) if mb_ and mb_ != "NA" else None
            map_data[marker] = {"chr": chr_, "cM": cm, "Mb": mb}

    # Process QTL scan results and merge the positional data
    results = []
    with open(qtl_file_path, "r", encoding="utf-8") as file_handler:
        reader = csv.reader(file_handler)
        next(reader)
        for line in reader:
            marker = line[0]
            lod_score = line[1]
            results.append({
                "name": marker,
                "lod_score": float(lod_score),
                **map_data.get(marker, {})  # Add chromosome and positions if available
            })
    return results


def process_qtl2_results(output_file: str) -> Dict[str, Any]:
    """Function provides abstraction for processing all QTL2 mapping results.

    Args: * File path to to the output generated

    Returns:
        Dict[str, any]: A dictionary containing both QTL
            and permutation results along with input data.
    """
    results  = read_output_file(output_file)
    qtl_results = process_scan_results(results["scan_file"],
                                       results["map_file"])
    permutation_results = process_permutation(results) if results["permutations"] > 0 else {}
    return {
        **results,
        "qtl_results": qtl_results,
        "permutation_results": permutation_results
    }