From 3ecebbc7f73bffde220735a43784bd9f50b7ca1d Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:08:13 +0300 Subject: feat: Init add streaming module for genenetwork3. --- gn3/computations/streaming.py | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 gn3/computations/streaming.py diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py new file mode 100644 index 0000000..7155adb --- /dev/null +++ b/gn3/computations/streaming.py @@ -0,0 +1,2 @@ +""" Module contains streaming functionality for genenetwork +""" -- cgit 1.4.1 From 533bbad4cfc1101d931964321dffba4ee62d096e Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:12:02 +0300 Subject: feat: Add function to run an external process and capture result in a file. --- gn3/computations/streaming.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index 7155adb..d39aa7f 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -1,2 +1,34 @@ -""" Module contains streaming functionality for genenetwork -""" +"""Module contains streaming procedures for genenetwork. """ +import subprocess + + +def run_process(cmd, output_file, run_id): + """Function to execute an external process and + capture the stdout in a file + input: + cmd: the command to execute as a list of args. + output_file: abs file path to write the stdout. + run_id: unique id to identify the process + + output: + Dict with the results o either success or failure. + """ + try: + # phase: execute the rscript cmd + with subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as process: + for line in iter(process.stdout.readline, b""): + # phase: capture the stdout for eaching line allowing read and write + with open(output_file, "a+", encoding="utf-8") as file_handler: + file_handler.write(line.decode("utf-8")) + process.wait() + if process.returncode == 0: + return {"msg": "success", "code": 0, "run_id": run_id} + return {"msg": "error occurred", "error": "Process failed", + "code": process.returncode, "run_id": run_id} + except subprocess.CalledProcessError as error: + return {"msg": "error occurred", + "error": str(error), "run_id": run_id} -- cgit 1.4.1 From 80dd33edf518e9b7bf349edc58b3aa976713c62c Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:14:35 +0300 Subject: refactor: import streaming functionality from module. --- gn3/api/rqtl.py | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 81bc5e4..f700afd 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -16,6 +16,7 @@ from gn3.computations.rqtl import ( process_rqtl_pairscan, process_perm_output, ) +from gn3.computations.streaming import run_process from gn3.fs_helpers import assert_path_exists, get_tmpdir rqtl = Blueprint("rqtl", __name__) @@ -102,35 +103,3 @@ def compute(): rqtl_output["significant"], ) = process_perm_output(rqtl_cmd.get("output_file")) return jsonify(rqtl_output) - - -def run_process(cmd, output_file, run_id): - """Function to execute an external process and - capture the stdout in a file - input: - cmd: the command to execute as a list of args. - output_file: abs file path to write the stdout. - run_id: unique id to identify the process - - output: - Dict with the results o either success or failure. - """ - try: - # phase: execute the rscript cmd - with subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) as process: - for line in iter(process.stdout.readline, b""): - # phase: capture the stdout for eaching line allowing read and write - with open(output_file, "a+", encoding="utf-8") as file_handler: - file_handler.write(line.decode("utf-8")) - process.wait() - if process.returncode == 0: - return {"msg": "success", "code": 0, "run_id": run_id} - return {"msg": "error occurred", "error": "Process failed", - "code": process.returncode, "run_id": run_id} - except subprocess.CalledProcessError as error: - return {"msg": "error occurred", - "error": str(error), "run_id": run_id} -- cgit 1.4.1 From 5515c4de5f0398ba7d4c60d1386963d018a91aa2 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:19:54 +0300 Subject: feat: Init add module for streaming api endpoins. --- gn3/api/streaming.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 gn3/api/streaming.py diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py new file mode 100644 index 0000000..7c9254a --- /dev/null +++ b/gn3/api/streaming.py @@ -0,0 +1 @@ +""" File contains endpoint for computational streaming""" -- cgit 1.4.1 From 8bfe9616012deab8f910e89b233ea819ec22c1c3 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:22:50 +0300 Subject: feat: Add streaming main endpoint. --- gn3/api/streaming.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py index 7c9254a..6569ceb 100644 --- a/gn3/api/streaming.py +++ b/gn3/api/streaming.py @@ -1 +1,24 @@ """ File contains endpoint for computational streaming""" +import os +from flask import current_app +from flask import jsonify +from flask import Blueprint +from flask import request + +streaming = Blueprint("streaming", __name__) + + +@streaming.route("/stream/", methods=["GET"]) +def stream(identifier="output"): + """ This endpoints streams stdout from a file expects + the indetifier to be the file """ + output_file = os.path.join(current_app.config.get("TMPDIR"), + f"{identifier}.txt") + seek_position = int(request.args.get("peak", 0)) + with open(output_file, encoding="utf-8") as file_handler: + # read to the last position default to 0 + file_handler.seek(seek_position) + results = {"data": file_handler.readlines(), + "run_id": identifier, + "pointer": file_handler.tell()} + return jsonify(results) -- cgit 1.4.1 From 4e63aa2246c2f3ca6c09c284f7f78b896fe0e33b Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:28:26 +0300 Subject: feat: Add and register streaming blueprint. --- gn3/api/streaming.py | 6 +++--- gn3/app.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py index 6569ceb..213d97f 100644 --- a/gn3/api/streaming.py +++ b/gn3/api/streaming.py @@ -5,11 +5,11 @@ from flask import jsonify from flask import Blueprint from flask import request -streaming = Blueprint("streaming", __name__) +streaming = Blueprint("stream", __name__) -@streaming.route("/stream/", methods=["GET"]) -def stream(identifier="output"): +@streaming.route("/", methods=["GET"]) +def stream(identifier): """ This endpoints streams stdout from a file expects the indetifier to be the file """ output_file = os.path.join(current_app.config.get("TMPDIR"), diff --git a/gn3/app.py b/gn3/app.py index 0bec32a..3841396 100644 --- a/gn3/app.py +++ b/gn3/app.py @@ -28,6 +28,7 @@ from gn3.api.metadata import metadata from gn3.api.sampledata import sampledata from gn3.api.llm import gnqa from gn3.api.rqtl2 import rqtl2 +from gn3.api.streaming import streaming from gn3.case_attributes import caseattr @@ -109,6 +110,7 @@ def create_app(config: Union[Dict, str, None] = None) -> Flask: app.register_blueprint(caseattr, url_prefix="/api/case-attribute") app.register_blueprint(gnqa, url_prefix="/api/llm") app.register_blueprint(rqtl2, url_prefix="/api/rqtl2") + app.register_blueprint(streaming, url_prefix="/api/stream") register_error_handlers(app) return app -- cgit 1.4.1 From f73605a67d3521e9a5cea86bdd5a947cf574f061 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:43:21 +0300 Subject: refactor: enhance docstring for streaming endpoint. --- gn3/api/streaming.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py index 213d97f..355d903 100644 --- a/gn3/api/streaming.py +++ b/gn3/api/streaming.py @@ -10,13 +10,15 @@ streaming = Blueprint("stream", __name__) @streaming.route("/", methods=["GET"]) def stream(identifier): - """ This endpoints streams stdout from a file expects - the indetifier to be the file """ + """ This endpoint streams stdout from a file. + It expects the indetifier to be the filename + in the TMPDIR created at the main computation + endpoint see example api/rqtl.""" output_file = os.path.join(current_app.config.get("TMPDIR"), f"{identifier}.txt") seek_position = int(request.args.get("peak", 0)) with open(output_file, encoding="utf-8") as file_handler: - # read to the last position default to 0 + # read from the last read position default to 0 file_handler.seek(seek_position) results = {"data": file_handler.readlines(), "run_id": identifier, -- cgit 1.4.1 From ba21ca6f6d237f3e2d30a65d44300688e521d10f Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:44:32 +0300 Subject: refactor: remove obsolete streaming endpoint. --- gn3/api/rqtl2.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/gn3/api/rqtl2.py b/gn3/api/rqtl2.py index a6ac411..e26b9ed 100644 --- a/gn3/api/rqtl2.py +++ b/gn3/api/rqtl2.py @@ -45,19 +45,3 @@ def compute(): return jsonify({"msg": "fail", "error": "Process failed", "run_id": run_id}) - - -@rqtl2.route("/stream/", methods=["GET"]) -def stream(identifier="output"): - """ This endpoints streams stdout from a file expects - the indetifier to be the file """ - output_file = os.path.join(current_app.config.get("TMPDIR"), - f"{identifier}.txt") - seek_position = int(request.args.get("peak", 0)) - with open(output_file, encoding="utf-8") as file_handler: - # read to the last position default to 0 - file_handler.seek(seek_position) - results = {"data": file_handler.readlines(), - "run_id": identifier, - "pointer": file_handler.tell()} - return jsonify(results) -- cgit 1.4.1 From b5309d1a47eacf8d44d7a42c85bd64282bfaa27b Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:45:46 +0300 Subject: fix: pylint fix. --- gn3/api/rqtl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index f700afd..9fd7017 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -2,7 +2,6 @@ import os import uuid -import subprocess from pathlib import Path from flask import Blueprint -- cgit 1.4.1 From 396b345ab3c3bf15409b4cb9c37f5496a55c82f9 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 11:33:31 +0300 Subject: feat: Add a decorator function to enable streaming functinality. --- gn3/api/rqtl.py | 19 +++---------------- gn3/computations/streaming.py | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 9fd7017..7b29cef 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -15,14 +15,15 @@ from gn3.computations.rqtl import ( process_rqtl_pairscan, process_perm_output, ) -from gn3.computations.streaming import run_process +from gn3.computations.streaming import run_process, enable_streaming from gn3.fs_helpers import assert_path_exists, get_tmpdir rqtl = Blueprint("rqtl", __name__) @rqtl.route("/compute", methods=["POST"]) -def compute(): +@enable_streaming +def compute(stream_ouput_file): """Given at least a geno_file and pheno_file, generate and run the rqtl_wrapper script and return the results as JSON @@ -31,17 +32,6 @@ def compute(): phenofile = request.form["pheno_file"] assert_path_exists(genofile) assert_path_exists(phenofile) - - run_id = request.args.get("id") - with open( - os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt"), - "w+", - encoding="utf-8", - ): - # TODO thos should be refactored - pass - # Split kwargs by those with values and boolean ones - # that just convert to True/False kwargs = ["covarstruct", "model", "method", "nperm", "scale", "control"] boolean_kwargs = ["addcovar", "interval", "pstrata", "pairscan"] all_kwargs = kwargs + boolean_kwargs @@ -75,7 +65,6 @@ def compute(): ) rqtl_output = {} - # get the stdout file run_id = request.args.get("id", str(uuid.uuid4())) if not os.path.isfile( os.path.join( @@ -83,8 +72,6 @@ def compute(): ) ): pass - stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") - run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_ouput_file, run_id) if "pairscan" in rqtl_bool_kwargs: diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index d39aa7f..771076c 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -1,5 +1,8 @@ """Module contains streaming procedures for genenetwork. """ +import os import subprocess +from functools import wraps +from flask import current_app, has_app_context, request def run_process(cmd, output_file, run_id): @@ -32,3 +35,22 @@ def run_process(cmd, output_file, run_id): except subprocess.CalledProcessError as error: return {"msg": "error occurred", "error": str(error), "run_id": run_id} + + +def enable_streaming(func): + """Decorator function to enable streaming for an endpoint + Note: should be used only in an app context + """ + @wraps(func) + def decorated_function(*args, **kwargs): + if not has_app_context: + raise RuntimeError("This decorator must be used within an app context.") + run_id = request.args.get("id") + stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt") + with open(stream_ouput_file, "w+", encoding="utf-8", + ) as file_handler: + file_handler.write("File created for streaming\n" + ) + return func(stream_ouput_file, *args, **kwargs) + return decorated_function -- cgit 1.4.1 From 22c6290db6bcdd93edc9b4fd40ac204d88afcc7a Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 11:49:48 +0300 Subject: refactor: raise and Handle error a app level. --- gn3/computations/streaming.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index 771076c..1cebde3 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -2,7 +2,7 @@ import os import subprocess from functools import wraps -from flask import current_app, has_app_context, request +from flask import current_app, request def run_process(cmd, output_file, run_id): @@ -43,8 +43,6 @@ def enable_streaming(func): """ @wraps(func) def decorated_function(*args, **kwargs): - if not has_app_context: - raise RuntimeError("This decorator must be used within an app context.") run_id = request.args.get("id") stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") -- cgit 1.4.1 From 7be70a40f885f23bec6a83b5c8abb17c5dd6e650 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 11:52:43 +0300 Subject: Minor fix. --- gn3/api/rqtl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 7b29cef..e9ea1b9 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -23,7 +23,7 @@ rqtl = Blueprint("rqtl", __name__) @rqtl.route("/compute", methods=["POST"]) @enable_streaming -def compute(stream_ouput_file): +def compute(stream_output_file): """Given at least a geno_file and pheno_file, generate and run the rqtl_wrapper script and return the results as JSON @@ -72,7 +72,7 @@ def compute(stream_ouput_file): ) ): pass - run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_ouput_file, run_id) + run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_output_file, run_id) if "pairscan" in rqtl_bool_kwargs: rqtl_output["results"] = process_rqtl_pairscan( -- cgit 1.4.1 From ec0b1563cce151ef8de5f92f61342924d87d63b0 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 12:04:54 +0300 Subject: Minor typo fix. --- gn3/api/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gn3/api/streaming.py b/gn3/api/streaming.py index 355d903..2b6b431 100644 --- a/gn3/api/streaming.py +++ b/gn3/api/streaming.py @@ -11,7 +11,7 @@ streaming = Blueprint("stream", __name__) @streaming.route("/", methods=["GET"]) def stream(identifier): """ This endpoint streams stdout from a file. - It expects the indetifier to be the filename + It expects the identifier to be the filename in the TMPDIR created at the main computation endpoint see example api/rqtl.""" output_file = os.path.join(current_app.config.get("TMPDIR"), -- cgit 1.4.1 From 200deff652bfae364d6e15ff2bceefdc1686f158 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 12:06:46 +0300 Subject: Typo fixes. --- gn3/computations/streaming.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index 1cebde3..b5afb5b 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -14,7 +14,7 @@ def run_process(cmd, output_file, run_id): run_id: unique id to identify the process output: - Dict with the results o either success or failure. + Dict with the results for either success or failure. """ try: # phase: execute the rscript cmd @@ -24,7 +24,7 @@ def run_process(cmd, output_file, run_id): stderr=subprocess.STDOUT, ) as process: for line in iter(process.stdout.readline, b""): - # phase: capture the stdout for eaching line allowing read and write + # phase: capture the stdout for each line allowing read and write with open(output_file, "a+", encoding="utf-8") as file_handler: file_handler.write(line.decode("utf-8")) process.wait() @@ -39,16 +39,16 @@ def run_process(cmd, output_file, run_id): def enable_streaming(func): """Decorator function to enable streaming for an endpoint - Note: should be used only in an app context + Note: should only be used in an app context """ @wraps(func) def decorated_function(*args, **kwargs): run_id = request.args.get("id") - stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), - f"{run_id}.txt") - with open(stream_ouput_file, "w+", encoding="utf-8", + stream_output_file = os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt") + with open(stream_output_file, "w+", encoding="utf-8", ) as file_handler: file_handler.write("File created for streaming\n" ) - return func(stream_ouput_file, *args, **kwargs) + return func(stream_output_file, *args, **kwargs) return decorated_function -- cgit 1.4.1