1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
"""Module contains streaming procedures for genenetwork. """
import os
import subprocess
from functools import wraps
from flask import current_app, request
def read_file(file_path):
"""Add utility function to read files"""
with open(file_path, "r", encoding="UTF-8") as file_handler:
return file_handler.read()
def run_process(cmd, log_file, run_id):
"""Function to execute an external process and
capture the stdout in a file
input:
cmd: the command to execute as a list of args.
log_file: abs file path to write the stdout.
run_id: unique id to identify the process
output:
Dict with the results for either success or failure.
"""
try:
# phase: execute the rscript cmd
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as process:
for line in iter(process.stdout.readline, b""):
# phase: capture the stdout for each line allowing read and write
with open(log_file, "a+", encoding="utf-8") as file_handler:
file_handler.write(line.decode("utf-8"))
process.wait()
return {"msg": "success" if process.returncode == 0 else "Process failed",
"run_id": run_id,
"log" : read_file(log_file),
"code": process.returncode}
except subprocess.CalledProcessError as error:
return {"msg": "error occurred",
"code": error.returncode,
"error": str(error),
"run_id": run_id,
"log" : read_file(log_file)}
def enable_streaming(func):
"""Decorator function to enable streaming for an endpoint
Note: should only be used in an app context
"""
@wraps(func)
def decorated_function(*args, **kwargs):
run_id = request.args.get("id")
stream_output_file = os.path.join(current_app.config.get("TMPDIR"),
f"{run_id}.txt")
with open(stream_output_file, "w+", encoding="utf-8",
) as file_handler:
file_handler.write("File created for streaming\n"
)
return func(stream_output_file, *args, **kwargs)
return decorated_function
|