diff options
Diffstat (limited to 'gn3')
| -rw-r--r-- | gn3/api/rqtl.py | 53 | ||||
| -rw-r--r-- | gn3/api/rqtl2.py | 16 | ||||
| -rw-r--r-- | gn3/api/streaming.py | 26 | ||||
| -rw-r--r-- | gn3/app.py | 2 | ||||
| -rw-r--r-- | gn3/computations/streaming.py | 54 |
5 files changed, 86 insertions, 65 deletions
diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 81bc5e4..e9ea1b9 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -2,7 +2,6 @@ import os import uuid -import subprocess from pathlib import Path from flask import Blueprint @@ -16,13 +15,15 @@ from gn3.computations.rqtl import ( process_rqtl_pairscan, process_perm_output, ) +from gn3.computations.streaming import run_process, enable_streaming from gn3.fs_helpers import assert_path_exists, get_tmpdir rqtl = Blueprint("rqtl", __name__) @rqtl.route("/compute", methods=["POST"]) -def compute(): +@enable_streaming +def compute(stream_output_file): """Given at least a geno_file and pheno_file, generate and run the rqtl_wrapper script and return the results as JSON @@ -31,17 +32,6 @@ def compute(): phenofile = request.form["pheno_file"] assert_path_exists(genofile) assert_path_exists(phenofile) - - run_id = request.args.get("id") - with open( - os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt"), - "w+", - encoding="utf-8", - ): - # TODO thos should be refactored - pass - # Split kwargs by those with values and boolean ones - # that just convert to True/False kwargs = ["covarstruct", "model", "method", "nperm", "scale", "control"] boolean_kwargs = ["addcovar", "interval", "pstrata", "pairscan"] all_kwargs = kwargs + boolean_kwargs @@ -75,7 +65,6 @@ def compute(): ) rqtl_output = {} - # get the stdout file run_id = request.args.get("id", str(uuid.uuid4())) if not os.path.isfile( os.path.join( @@ -83,9 +72,7 @@ def compute(): ) ): pass - stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") - - run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_ouput_file, run_id) + run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_output_file, run_id) if "pairscan" in rqtl_bool_kwargs: rqtl_output["results"] = process_rqtl_pairscan( @@ -102,35 +89,3 @@ def compute(): rqtl_output["significant"], ) = process_perm_output(rqtl_cmd.get("output_file")) return jsonify(rqtl_output) - - -def run_process(cmd, output_file, run_id): - """Function to execute an external process and - capture the stdout in a file - input: - cmd: the command to execute as a list of args. - output_file: abs file path to write the stdout. - run_id: unique id to identify the process - - output: - Dict with the results o either success or failure. - """ - try: - # phase: execute the rscript cmd - with subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) as process: - for line in iter(process.stdout.readline, b""): - # phase: capture the stdout for eaching line allowing read and write - with open(output_file, "a+", encoding="utf-8") as file_handler: - file_handler.write(line.decode("utf-8")) - process.wait() - if process.returncode == 0: - return {"msg": "success", "code": 0, "run_id": run_id} - return {"msg": "error occurred", "error": "Process failed", - "code": process.returncode, "run_id": run_id} - except subprocess.CalledProcessError as error: - return {"msg": "error occurred", - "error": str(error), "run_id": run_id} diff --git a/gn3/api/rqtl2.py b/gn3/api/rqtl2.py index a6ac411..e26b9ed 100644 --- a/gn3/api/rqtl2.py +++ b/gn3/api/rqtl2.py @@ -45,19 +45,3 @@ def compute(): return jsonify({"msg": "fail", "error": "Process failed", "run_id": run_id}) - - -@rqtl2.route("/stream/<identifier>", methods=["GET"]) -def stream(identifier="output"): - """ This endpoints streams stdout from a file expects - the indetifier to be the file """ - output_file = os.path.join(current_app.config.get("TMPDIR"), - f"{identifier}.txt") - seek_position = int(request.args.get("peak", 0)) - with open(output_file, encoding="utf-8") as file_handler: - # read to the last position default to 0 - file_handler.seek(seek_position) - results = {"data": file_handler.readlines(), - "run_id": identifier, - "pointer": file_handler.tell()} - return jsonify(results) diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py new file mode 100644 index 0000000..2b6b431 --- /dev/null +++ b/gn3/api/streaming.py @@ -0,0 +1,26 @@ +""" File contains endpoint for computational streaming""" +import os +from flask import current_app +from flask import jsonify +from flask import Blueprint +from flask import request + +streaming = Blueprint("stream", __name__) + + +@streaming.route("/<identifier>", methods=["GET"]) +def stream(identifier): + """ This endpoint streams stdout from a file. + It expects the identifier to be the filename + in the TMPDIR created at the main computation + endpoint see example api/rqtl.""" + output_file = os.path.join(current_app.config.get("TMPDIR"), + f"{identifier}.txt") + seek_position = int(request.args.get("peak", 0)) + with open(output_file, encoding="utf-8") as file_handler: + # read from the last read position default to 0 + file_handler.seek(seek_position) + results = {"data": file_handler.readlines(), + "run_id": identifier, + "pointer": file_handler.tell()} + return jsonify(results) diff --git a/gn3/app.py b/gn3/app.py index 0bec32a..3841396 100644 --- a/gn3/app.py +++ b/gn3/app.py @@ -28,6 +28,7 @@ from gn3.api.metadata import metadata from gn3.api.sampledata import sampledata from gn3.api.llm import gnqa from gn3.api.rqtl2 import rqtl2 +from gn3.api.streaming import streaming from gn3.case_attributes import caseattr @@ -109,6 +110,7 @@ def create_app(config: Union[Dict, str, None] = None) -> Flask: app.register_blueprint(caseattr, url_prefix="/api/case-attribute") app.register_blueprint(gnqa, url_prefix="/api/llm") app.register_blueprint(rqtl2, url_prefix="/api/rqtl2") + app.register_blueprint(streaming, url_prefix="/api/stream") register_error_handlers(app) return app diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py new file mode 100644 index 0000000..b5afb5b --- /dev/null +++ b/gn3/computations/streaming.py @@ -0,0 +1,54 @@ +"""Module contains streaming procedures for genenetwork. """ +import os +import subprocess +from functools import wraps +from flask import current_app, request + + +def run_process(cmd, output_file, run_id): + """Function to execute an external process and + capture the stdout in a file + input: + cmd: the command to execute as a list of args. + output_file: abs file path to write the stdout. + run_id: unique id to identify the process + + output: + Dict with the results for either success or failure. + """ + try: + # phase: execute the rscript cmd + with subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as process: + for line in iter(process.stdout.readline, b""): + # phase: capture the stdout for each line allowing read and write + with open(output_file, "a+", encoding="utf-8") as file_handler: + file_handler.write(line.decode("utf-8")) + process.wait() + if process.returncode == 0: + return {"msg": "success", "code": 0, "run_id": run_id} + return {"msg": "error occurred", "error": "Process failed", + "code": process.returncode, "run_id": run_id} + except subprocess.CalledProcessError as error: + return {"msg": "error occurred", + "error": str(error), "run_id": run_id} + + +def enable_streaming(func): + """Decorator function to enable streaming for an endpoint + Note: should only be used in an app context + """ + @wraps(func) + def decorated_function(*args, **kwargs): + run_id = request.args.get("id") + stream_output_file = os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt") + with open(stream_output_file, "w+", encoding="utf-8", + ) as file_handler: + file_handler.write("File created for streaming\n" + ) + return func(stream_output_file, *args, **kwargs) + return decorated_function |
