1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
"""Module contains functions to parse and process rqtl2 input and output"""
import os
import csv
import uuid
import json
from pathlib import Path
def generate_rqtl2_files(data, workspace_dir):
"""Prepare data and generate necessary CSV files
required to write to control_file
"""
file_to_name_map = {
"geno_file": "geno_data",
"pheno_file": "pheno_data",
"geno_map_file": "geno_map_data",
"pheno_map_file": "pheno_map_data",
"phenocovar_file": "phenocovar_data",
}
parsed_files = {}
for file_name, data_key in file_to_name_map.items():
if data_key in data:
file_path = write_to_csv(workspace_dir, f"{file_name}.csv", data[data_key])
if file_path:
parsed_files[file_name] = file_path
return {**data, **parsed_files}
def write_to_csv(work_dir, file_name, data:list[dict],
headers= None, delimiter=","):
"""Functions to write data list to csv file
if headers is not provided use the keys for first boject.
"""
if not data:
return ""
if headers is None:
headers = data[0].keys()
file_path = os.path.join(work_dir, file_name)
with open(file_path, "w", encoding="utf-8") as file_handler:
writer = csv.DictWriter(file_handler, fieldnames=headers,
delimiter=delimiter)
writer.writeheader()
for row in data:
writer.writerow(row)
return file_path
def validate_required_keys(required_keys:list, data:dict) -> tuple[bool, str]:
"""Check for missing keys in data object"""
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
return False, f"Required key(s) missing: {', '.join(missing_keys)}"
return True, ""
def compose_rqtl2_cmd(rqtl_path, input_file,
output_file, workspace_dir,
data, config):
"""Compose the command for running the R/QTL2 analysis."""
# pylint: disable=R0913
params = {
"input_file": input_file,
"directory": workspace_dir,
"output_file": output_file,
"nperm": data.get("nperm", 0),
"threshold": data.get("threshold", 1),
"cores": config.get('MULTIPROCESSOR_PROCS', 1)
}
rscript_path = config.get("RSCRIPT", "Rscript")
return f"{rscript_path} { rqtl_path } " + " ".join(
[f"--{key} {val}" for key, val in params.items()])
def create_file(file_path):
"""Utility function to create file given a file_path"""
try:
with open(file_path, "x",encoding="utf-8") as _file_handler:
return True, f"File created at {file_path}"
except FileExistsError:
return False, "File Already Exists"
def prepare_files(tmpdir):
"""Prepare necessary files and workspace dir for computation."""
workspace_dir = os.path.join(tmpdir, str(uuid.uuid4())) #
Path(workspace_dir).mkdir(parents=False, exist_ok=True)
input_file = os.path.join(workspace_dir, f"rqtl2-input-{uuid.uuid4()}.json")
output_file = os.path.join(workspace_dir, f"rqtl2-output-{uuid.uuid4()}.json")
# to ensure streaming api has access to file even after computation ends
# .. Create the log file outside the workspace_dir
log_file = os.path.join(tmpdir, f"rqtl2-log-{uuid.uuid4()}")
for file_path in [input_file, output_file, log_file]:
create_file(file_path)
return workspace_dir, input_file, output_file, log_file
def write_input_file(input_file, workspace_dir, data):
"""
Write input data to a json file to be passed
as input to the rqtl2 script
"""
with open(input_file,"w+", encoding="UTF-8") as file_handler:
# todo choose a better variable name
rqtl2_files = generate_rqtl2_files(data, workspace_dir)
json.dump(rqtl2_files, file_handler)
|