aboutsummaryrefslogtreecommitdiff
path: root/gn3/computations/wgcna.py
blob: d1f7b3226343ad55d71c6631a6b04024a4615661 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""module contains code to preprocess and call wgcna script"""
import os
import json
import uuid
import base64
import subprocess
from pathlib import Path


from gn3.settings import TMPDIR
from gn3.commands import run_cmd


def dump_wgcna_data(request_data: dict):
    """function to dump request data to json file"""
    filename = f"{str(uuid.uuid4())}.json"

    temp_file_path = os.path.join(TMPDIR, filename)

    request_data["TMPDIR"] = TMPDIR

    with open(temp_file_path, "w", encoding="utf-8") as output_file:
        json.dump(request_data, output_file)

    return temp_file_path


def stream_cmd_output(socketio, request_data, cmd: str):
    """function to stream in realtime"""
    # xtodo  syncing and closing /edge cases

    socketio.emit("gn3", {"data": f"calling you script {cmd}"},
                  namespace="/", room=request_data["socket_id"])
    with subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) as results:
        if results.stdout is not None:
            for line in iter(results.stdout.readline, b""):
                socketio.emit("gn3",
                              {"data": line.decode("utf-8").rstrip()},
                              namespace="/", room=request_data["socket_id"])

                socketio.emit(
                    "gn3", {"data":
                               "parsing the output results"}, namespace="/",
                    room=request_data["socket_id"])


def process_image(image_loc: str) -> bytes:
    """encode the image"""

    try:
        with open(image_loc, "rb") as image_file:
            return base64.b64encode(image_file.read())
    except FileNotFoundError:
        return b""


def compose_wgcna_cmd(rscript_path: str, temp_file_path: str):
    """function to componse wgcna cmd"""
    # (todo):issue relative paths to abs paths
    abs_rscript_path = str(
        Path(__file__).absolute().parent.parent.parent.joinpath(
            f"scripts/{rscript_path}"))
    cmd = f"Rscript {abs_rscript_path} {temp_file_path}"
    return cmd


def call_wgcna_script(rscript_path: str, request_data: dict):
    """function to call wgcna script"""
    generated_file = dump_wgcna_data(request_data)
    cmd = compose_wgcna_cmd(rscript_path, generated_file)

    # stream_cmd_output(request_data, cmd)  disable streaming of data

    try:

        run_cmd_results = run_cmd(cmd)

        with open(generated_file, "r", encoding="utf-8") as outputfile:

            if run_cmd_results["code"] != 0:
                return run_cmd_results

            output_file_data = json.load(outputfile)
            output_file_data["output"]["image_data"] = process_image(
                output_file_data["output"]["imageLoc"]).decode("ascii")
            # json format only supports  unicode string// to get image data reconvert

            return {
                "data": output_file_data,
                **run_cmd_results
            }
    except FileNotFoundError:
        # relook  at handling errors gn3
        return {
            "output": "output file not found"
        }