about summary refs log tree commit diff
path: root/gn3/api
diff options
context:
space:
mode:
authorAlexander Kabui2025-01-11 08:33:51 +0300
committerGitHub2025-01-11 08:33:51 +0300
commite501df4458080cd4a39f3124c7e13dacd9d9f28a (patch)
tree6556b96aece75fddf563753b5e089a6608fbd88c /gn3/api
parent677d68c4b56585b29227efb9821ca674951411ca (diff)
parent200deff652bfae364d6e15ff2bceefdc1686f158 (diff)
downloadgenenetwork3-e501df4458080cd4a39f3124c7e13dacd9d9f28a.tar.gz
Merge pull request #206 from genenetwork/enhancements/streaming-modularization
Enhancements/streaming modularization
Diffstat (limited to 'gn3/api')
-rw-r--r--gn3/api/rqtl.py53
-rw-r--r--gn3/api/rqtl2.py16
-rw-r--r--gn3/api/streaming.py26
3 files changed, 30 insertions, 65 deletions
diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py
index 81bc5e4..e9ea1b9 100644
--- a/gn3/api/rqtl.py
+++ b/gn3/api/rqtl.py
@@ -2,7 +2,6 @@
 
 import os
 import uuid
-import subprocess
 from pathlib import Path
 
 from flask import Blueprint
@@ -16,13 +15,15 @@ from gn3.computations.rqtl import (
     process_rqtl_pairscan,
     process_perm_output,
 )
+from gn3.computations.streaming import run_process, enable_streaming
 from gn3.fs_helpers import assert_path_exists, get_tmpdir
 
 rqtl = Blueprint("rqtl", __name__)
 
 
 @rqtl.route("/compute", methods=["POST"])
-def compute():
+@enable_streaming
+def compute(stream_output_file):
     """Given at least a geno_file and pheno_file, generate and
     run the rqtl_wrapper script and return the results as JSON
 
@@ -31,17 +32,6 @@ def compute():
     phenofile = request.form["pheno_file"]
     assert_path_exists(genofile)
     assert_path_exists(phenofile)
-
-    run_id = request.args.get("id")
-    with open(
-        os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt"),
-        "w+",
-        encoding="utf-8",
-    ):
-        # TODO thos should  be refactored
-        pass
-    # Split kwargs by those with values and boolean ones
-    # that just convert to True/False
     kwargs = ["covarstruct", "model", "method", "nperm", "scale", "control"]
     boolean_kwargs = ["addcovar", "interval", "pstrata", "pairscan"]
     all_kwargs = kwargs + boolean_kwargs
@@ -75,7 +65,6 @@ def compute():
     )
 
     rqtl_output = {}
-    #  get the stdout file
     run_id = request.args.get("id", str(uuid.uuid4()))
     if not os.path.isfile(
         os.path.join(
@@ -83,9 +72,7 @@ def compute():
         )
     ):
         pass
-    stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt")
-
-    run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_ouput_file, run_id)
+    run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_output_file, run_id)
 
     if "pairscan" in rqtl_bool_kwargs:
         rqtl_output["results"] = process_rqtl_pairscan(
@@ -102,35 +89,3 @@ def compute():
             rqtl_output["significant"],
         ) = process_perm_output(rqtl_cmd.get("output_file"))
     return jsonify(rqtl_output)
-
-
-def run_process(cmd, output_file, run_id):
-    """Function to execute an external process and
-       capture the stdout in a file
-      input:
-           cmd: the command to execute as a list of args.
-           output_file: abs file path to write the stdout.
-           run_id: unique id to identify the process
-
-      output:
-          Dict with the results o either success or failure.
-    """
-    try:
-        # phase: execute the  rscript cmd
-        with subprocess.Popen(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-        ) as process:
-            for line in iter(process.stdout.readline, b""):
-                # phase: capture the stdout for eaching line allowing read and write
-                with open(output_file, "a+", encoding="utf-8") as file_handler:
-                    file_handler.write(line.decode("utf-8"))
-            process.wait()
-        if process.returncode == 0:
-            return {"msg": "success", "code": 0, "run_id": run_id}
-        return {"msg": "error occurred", "error": "Process failed",
-                "code": process.returncode, "run_id": run_id}
-    except subprocess.CalledProcessError as error:
-        return {"msg": "error occurred",
-                "error": str(error), "run_id": run_id}
diff --git a/gn3/api/rqtl2.py b/gn3/api/rqtl2.py
index a6ac411..e26b9ed 100644
--- a/gn3/api/rqtl2.py
+++ b/gn3/api/rqtl2.py
@@ -45,19 +45,3 @@ def compute():
     return jsonify({"msg": "fail",
                     "error": "Process failed",
                     "run_id": run_id})
-
-
-@rqtl2.route("/stream/<identifier>",  methods=["GET"])
-def stream(identifier="output"):
-    """ This endpoints streams stdout from a file expects
-    the indetifier to be the file """
-    output_file = os.path.join(current_app.config.get("TMPDIR"),
-                               f"{identifier}.txt")
-    seek_position = int(request.args.get("peak", 0))
-    with open(output_file, encoding="utf-8") as file_handler:
-        # read to the last position default to 0
-        file_handler.seek(seek_position)
-        results = {"data": file_handler.readlines(),
-                   "run_id": identifier,
-                   "pointer": file_handler.tell()}
-        return jsonify(results)
diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py
new file mode 100644
index 0000000..2b6b431
--- /dev/null
+++ b/gn3/api/streaming.py
@@ -0,0 +1,26 @@
+""" File contains endpoint for computational streaming"""
+import os
+from flask import current_app
+from flask import jsonify
+from flask import Blueprint
+from flask import request
+
+streaming = Blueprint("stream", __name__)
+
+
+@streaming.route("/<identifier>",  methods=["GET"])
+def stream(identifier):
+    """ This endpoint streams stdout from a file.
+    It expects the identifier to be the filename
+    in the TMPDIR created at the main computation
+    endpoint see example api/rqtl."""
+    output_file = os.path.join(current_app.config.get("TMPDIR"),
+                               f"{identifier}.txt")
+    seek_position = int(request.args.get("peak", 0))
+    with open(output_file, encoding="utf-8") as file_handler:
+        # read from the last  read position default to 0
+        file_handler.seek(seek_position)
+        results = {"data": file_handler.readlines(),
+                   "run_id": identifier,
+                   "pointer": file_handler.tell()}
+        return jsonify(results)