about summary refs log tree commit diff
path: root/gn3/api/rqtl.py
diff options
context:
space:
mode:
authorAlexander Kabui2025-01-11 08:33:51 +0300
committerGitHub2025-01-11 08:33:51 +0300
commite501df4458080cd4a39f3124c7e13dacd9d9f28a (patch)
tree6556b96aece75fddf563753b5e089a6608fbd88c /gn3/api/rqtl.py
parent677d68c4b56585b29227efb9821ca674951411ca (diff)
parent200deff652bfae364d6e15ff2bceefdc1686f158 (diff)
downloadgenenetwork3-e501df4458080cd4a39f3124c7e13dacd9d9f28a.tar.gz
Merge pull request #206 from genenetwork/enhancements/streaming-modularization
Enhancements/streaming modularization
Diffstat (limited to 'gn3/api/rqtl.py')
-rw-r--r--gn3/api/rqtl.py53
1 files changed, 4 insertions, 49 deletions
diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py
index 81bc5e4..e9ea1b9 100644
--- a/gn3/api/rqtl.py
+++ b/gn3/api/rqtl.py
@@ -2,7 +2,6 @@
 
 import os
 import uuid
-import subprocess
 from pathlib import Path
 
 from flask import Blueprint
@@ -16,13 +15,15 @@ from gn3.computations.rqtl import (
     process_rqtl_pairscan,
     process_perm_output,
 )
+from gn3.computations.streaming import run_process, enable_streaming
 from gn3.fs_helpers import assert_path_exists, get_tmpdir
 
 rqtl = Blueprint("rqtl", __name__)
 
 
 @rqtl.route("/compute", methods=["POST"])
-def compute():
+@enable_streaming
+def compute(stream_output_file):
     """Given at least a geno_file and pheno_file, generate and
     run the rqtl_wrapper script and return the results as JSON
 
@@ -31,17 +32,6 @@ def compute():
     phenofile = request.form["pheno_file"]
     assert_path_exists(genofile)
     assert_path_exists(phenofile)
-
-    run_id = request.args.get("id")
-    with open(
-        os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt"),
-        "w+",
-        encoding="utf-8",
-    ):
-        # TODO thos should  be refactored
-        pass
-    # Split kwargs by those with values and boolean ones
-    # that just convert to True/False
     kwargs = ["covarstruct", "model", "method", "nperm", "scale", "control"]
     boolean_kwargs = ["addcovar", "interval", "pstrata", "pairscan"]
     all_kwargs = kwargs + boolean_kwargs
@@ -75,7 +65,6 @@ def compute():
     )
 
     rqtl_output = {}
-    #  get the stdout file
     run_id = request.args.get("id", str(uuid.uuid4()))
     if not os.path.isfile(
         os.path.join(
@@ -83,9 +72,7 @@ def compute():
         )
     ):
         pass
-    stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt")
-
-    run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_ouput_file, run_id)
+    run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_output_file, run_id)
 
     if "pairscan" in rqtl_bool_kwargs:
         rqtl_output["results"] = process_rqtl_pairscan(
@@ -102,35 +89,3 @@ def compute():
             rqtl_output["significant"],
         ) = process_perm_output(rqtl_cmd.get("output_file"))
     return jsonify(rqtl_output)
-
-
-def run_process(cmd, output_file, run_id):
-    """Function to execute an external process and
-       capture the stdout in a file
-      input:
-           cmd: the command to execute as a list of args.
-           output_file: abs file path to write the stdout.
-           run_id: unique id to identify the process
-
-      output:
-          Dict with the results o either success or failure.
-    """
-    try:
-        # phase: execute the  rscript cmd
-        with subprocess.Popen(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-        ) as process:
-            for line in iter(process.stdout.readline, b""):
-                # phase: capture the stdout for eaching line allowing read and write
-                with open(output_file, "a+", encoding="utf-8") as file_handler:
-                    file_handler.write(line.decode("utf-8"))
-            process.wait()
-        if process.returncode == 0:
-            return {"msg": "success", "code": 0, "run_id": run_id}
-        return {"msg": "error occurred", "error": "Process failed",
-                "code": process.returncode, "run_id": run_id}
-    except subprocess.CalledProcessError as error:
-        return {"msg": "error occurred",
-                "error": str(error), "run_id": run_id}