From 3ecebbc7f73bffde220735a43784bd9f50b7ca1d Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:08:13 +0300 Subject: feat: Init add streaming module for genenetwork3. --- gn3/computations/streaming.py | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 gn3/computations/streaming.py (limited to 'gn3/computations/streaming.py') diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py new file mode 100644 index 0000000..7155adb --- /dev/null +++ b/gn3/computations/streaming.py @@ -0,0 +1,2 @@ +""" Module contains streaming functionality for genenetwork +""" -- cgit 1.4.1 From 533bbad4cfc1101d931964321dffba4ee62d096e Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 10:12:02 +0300 Subject: feat: Add function to run an external process and capture result in a file. --- gn3/computations/streaming.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) (limited to 'gn3/computations/streaming.py') diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index 7155adb..d39aa7f 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -1,2 +1,34 @@ -""" Module contains streaming functionality for genenetwork -""" +"""Module contains streaming procedures for genenetwork. """ +import subprocess + + +def run_process(cmd, output_file, run_id): + """Function to execute an external process and + capture the stdout in a file + input: + cmd: the command to execute as a list of args. + output_file: abs file path to write the stdout. + run_id: unique id to identify the process + + output: + Dict with the results o either success or failure. + """ + try: + # phase: execute the rscript cmd + with subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as process: + for line in iter(process.stdout.readline, b""): + # phase: capture the stdout for eaching line allowing read and write + with open(output_file, "a+", encoding="utf-8") as file_handler: + file_handler.write(line.decode("utf-8")) + process.wait() + if process.returncode == 0: + return {"msg": "success", "code": 0, "run_id": run_id} + return {"msg": "error occurred", "error": "Process failed", + "code": process.returncode, "run_id": run_id} + except subprocess.CalledProcessError as error: + return {"msg": "error occurred", + "error": str(error), "run_id": run_id} -- cgit 1.4.1 From 396b345ab3c3bf15409b4cb9c37f5496a55c82f9 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 11:33:31 +0300 Subject: feat: Add a decorator function to enable streaming functinality. --- gn3/api/rqtl.py | 19 +++---------------- gn3/computations/streaming.py | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 16 deletions(-) (limited to 'gn3/computations/streaming.py') diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 9fd7017..7b29cef 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -15,14 +15,15 @@ from gn3.computations.rqtl import ( process_rqtl_pairscan, process_perm_output, ) -from gn3.computations.streaming import run_process +from gn3.computations.streaming import run_process, enable_streaming from gn3.fs_helpers import assert_path_exists, get_tmpdir rqtl = Blueprint("rqtl", __name__) @rqtl.route("/compute", methods=["POST"]) -def compute(): +@enable_streaming +def compute(stream_ouput_file): """Given at least a geno_file and pheno_file, generate and run the rqtl_wrapper script and return the results as JSON @@ -31,17 +32,6 @@ def compute(): phenofile = request.form["pheno_file"] assert_path_exists(genofile) assert_path_exists(phenofile) - - run_id = request.args.get("id") - with open( - os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt"), - "w+", - encoding="utf-8", - ): - # TODO thos should be refactored - pass - # Split kwargs by those with values and boolean ones - # that just convert to True/False kwargs = ["covarstruct", "model", "method", "nperm", "scale", "control"] boolean_kwargs = ["addcovar", "interval", "pstrata", "pairscan"] all_kwargs = kwargs + boolean_kwargs @@ -75,7 +65,6 @@ def compute(): ) rqtl_output = {} - # get the stdout file run_id = request.args.get("id", str(uuid.uuid4())) if not os.path.isfile( os.path.join( @@ -83,8 +72,6 @@ def compute(): ) ): pass - stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") - run_process(rqtl_cmd.get("rqtl_cmd").split(), stream_ouput_file, run_id) if "pairscan" in rqtl_bool_kwargs: diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index d39aa7f..771076c 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -1,5 +1,8 @@ """Module contains streaming procedures for genenetwork. """ +import os import subprocess +from functools import wraps +from flask import current_app, has_app_context, request def run_process(cmd, output_file, run_id): @@ -32,3 +35,22 @@ def run_process(cmd, output_file, run_id): except subprocess.CalledProcessError as error: return {"msg": "error occurred", "error": str(error), "run_id": run_id} + + +def enable_streaming(func): + """Decorator function to enable streaming for an endpoint + Note: should be used only in an app context + """ + @wraps(func) + def decorated_function(*args, **kwargs): + if not has_app_context: + raise RuntimeError("This decorator must be used within an app context.") + run_id = request.args.get("id") + stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt") + with open(stream_ouput_file, "w+", encoding="utf-8", + ) as file_handler: + file_handler.write("File created for streaming\n" + ) + return func(stream_ouput_file, *args, **kwargs) + return decorated_function -- cgit 1.4.1 From 22c6290db6bcdd93edc9b4fd40ac204d88afcc7a Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 11:49:48 +0300 Subject: refactor: raise and Handle error a app level. --- gn3/computations/streaming.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'gn3/computations/streaming.py') diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index 771076c..1cebde3 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -2,7 +2,7 @@ import os import subprocess from functools import wraps -from flask import current_app, has_app_context, request +from flask import current_app, request def run_process(cmd, output_file, run_id): @@ -43,8 +43,6 @@ def enable_streaming(func): """ @wraps(func) def decorated_function(*args, **kwargs): - if not has_app_context: - raise RuntimeError("This decorator must be used within an app context.") run_id = request.args.get("id") stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") -- cgit 1.4.1 From 200deff652bfae364d6e15ff2bceefdc1686f158 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Thu, 9 Jan 2025 12:06:46 +0300 Subject: Typo fixes. --- gn3/computations/streaming.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'gn3/computations/streaming.py') diff --git a/gn3/computations/streaming.py b/gn3/computations/streaming.py index 1cebde3..b5afb5b 100644 --- a/gn3/computations/streaming.py +++ b/gn3/computations/streaming.py @@ -14,7 +14,7 @@ def run_process(cmd, output_file, run_id): run_id: unique id to identify the process output: - Dict with the results o either success or failure. + Dict with the results for either success or failure. """ try: # phase: execute the rscript cmd @@ -24,7 +24,7 @@ def run_process(cmd, output_file, run_id): stderr=subprocess.STDOUT, ) as process: for line in iter(process.stdout.readline, b""): - # phase: capture the stdout for eaching line allowing read and write + # phase: capture the stdout for each line allowing read and write with open(output_file, "a+", encoding="utf-8") as file_handler: file_handler.write(line.decode("utf-8")) process.wait() @@ -39,16 +39,16 @@ def run_process(cmd, output_file, run_id): def enable_streaming(func): """Decorator function to enable streaming for an endpoint - Note: should be used only in an app context + Note: should only be used in an app context """ @wraps(func) def decorated_function(*args, **kwargs): run_id = request.args.get("id") - stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), - f"{run_id}.txt") - with open(stream_ouput_file, "w+", encoding="utf-8", + stream_output_file = os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt") + with open(stream_output_file, "w+", encoding="utf-8", ) as file_handler: file_handler.write("File created for streaming\n" ) - return func(stream_ouput_file, *args, **kwargs) + return func(stream_output_file, *args, **kwargs) return decorated_function -- cgit 1.4.1