From 90a39a513da56612da1b2d5e3268b04922568db8 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Tue, 17 Dec 2024 22:23:04 +0300 Subject: feat: Init integrate streaming functionality to rqtl1. --- gn3/api/rqtl.py | 49 +++++++++++++++++++++++++++++++++++++++++++------ gn3/api/rqtl2.py | 7 ++++--- 2 files changed, 47 insertions(+), 9 deletions(-) (limited to 'gn3/api') diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 30173f9..7246ba8 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -1,5 +1,7 @@ """Endpoints for running the rqtl cmd""" import os +import uuid +import subprocess from pathlib import Path from flask import Blueprint @@ -13,6 +15,7 @@ from gn3.fs_helpers import assert_path_exists, get_tmpdir rqtl = Blueprint("rqtl", __name__) + @rqtl.route("/compute", methods=["POST"]) def compute(): """Given at least a geno_file and pheno_file, generate and @@ -21,17 +24,22 @@ run the rqtl_wrapper script and return the results as JSON """ genofile = request.form['geno_file'] phenofile = request.form['pheno_file'] - assert_path_exists(genofile) assert_path_exists(phenofile) - # Split kwargs by those with values and boolean ones that just convert to True/False + run_id = request.args.get("id") + with open(os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt"), "w+"): + pass + # Split kwargs by those with values and boolean ones + # that just convert to True/False kwargs = ["covarstruct", "model", "method", "nperm", "scale", "control"] boolean_kwargs = ["addcovar", "interval", "pstrata", "pairscan"] all_kwargs = kwargs + boolean_kwargs rqtl_kwargs = {"geno": genofile, "pheno": phenofile, "outdir": current_app.config.get("TMPDIR")} rqtl_bool_kwargs = [] + for kwarg in all_kwargs: if kwarg in request.form: if kwarg in kwargs: @@ -39,7 +47,7 @@ run the rqtl_wrapper script and return the results as JSON if kwarg in boolean_kwargs: rqtl_bool_kwargs.append(kwarg) - outdir = os.path.join(get_tmpdir(),"gn3") + outdir = os.path.join(get_tmpdir(), "gn3") if not os.path.isdir(outdir): os.mkdir(outdir) @@ -52,9 +60,13 @@ run the rqtl_wrapper script and return the results as JSON ) rqtl_output = {} + # get the stdout file + run_id = request.args.get("id", str(uuid.uuid4())) if not os.path.isfile(os.path.join(current_app.config.get("TMPDIR"), "gn3", rqtl_cmd.get('output_file'))): - os.system(rqtl_cmd.get('rqtl_cmd')) + stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), + f"{run_id}.txt") + run_process(rqtl_cmd.get("rqtl_cmd"), stream_ouput_file, run_id) if "pairscan" in rqtl_bool_kwargs: rqtl_output['results'] = process_rqtl_pairscan(rqtl_cmd.get('output_file'), genofile) @@ -62,7 +74,32 @@ run the rqtl_wrapper script and return the results as JSON rqtl_output['results'] = process_rqtl_mapping(rqtl_cmd.get('output_file')) if int(rqtl_kwargs['nperm']) > 0: - rqtl_output['perm_results'], rqtl_output['suggestive'], rqtl_output['significant'] = \ - process_perm_output(rqtl_cmd.get('output_file')) + rqtl_output['perm_results'], rqtl_output['suggestive'], + rqtl_output['significant'] = process_perm_output(rqtl_cmd.get('output_file')) return jsonify(rqtl_output) + + +def run_process(rscript_cmd, output_file, run_id): + """Main function to do the streaming""" + # TODO: move this function to own file + # pylint: disable=consider-using-with + process = subprocess.Popen( + rscript_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + for line in iter(process.stdout.readline, b""): + # these allow endpoint stream to read the file since + # no read and write file same tiem + with open(output_file, "a+") as file_handler: + file_handler.write(line.decode("utf-8")) + process.stdout.close() + process.wait() + if process.returncode == 0: + return jsonify({"msg": "success", + "results": "file_here", + "run_id": run_id}) + return jsonify({"msg": "fail", + "error": "Process failed", + "run_id": run_id}) diff --git a/gn3/api/rqtl2.py b/gn3/api/rqtl2.py index 55d847e..a6ac411 100644 --- a/gn3/api/rqtl2.py +++ b/gn3/api/rqtl2.py @@ -57,6 +57,7 @@ def stream(identifier="output"): with open(output_file, encoding="utf-8") as file_handler: # read to the last position default to 0 file_handler.seek(seek_position) - return jsonify({"data": file_handler.readlines(), - "run_id": identifier, - "pointer": file_handler.tell()}) + results = {"data": file_handler.readlines(), + "run_id": identifier, + "pointer": file_handler.tell()} + return jsonify(results) -- cgit 1.4.1 From bba6befbad749533779c0b6057e22885d351595b Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Fri, 20 Dec 2024 17:53:07 +0300 Subject: feat: Enable streaming for output. --- gn3/api/rqtl.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'gn3/api') diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 7246ba8..39f45a3 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -64,9 +64,11 @@ run the rqtl_wrapper script and return the results as JSON run_id = request.args.get("id", str(uuid.uuid4())) if not os.path.isfile(os.path.join(current_app.config.get("TMPDIR"), "gn3", rqtl_cmd.get('output_file'))): - stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), + pass + stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") - run_process(rqtl_cmd.get("rqtl_cmd"), stream_ouput_file, run_id) + + results = run_process(rqtl_cmd.get("rqtl_cmd"), stream_ouput_file, run_id) if "pairscan" in rqtl_bool_kwargs: rqtl_output['results'] = process_rqtl_pairscan(rqtl_cmd.get('output_file'), genofile) -- cgit 1.4.1 From 6ea74320bc92939d1c99e406623568a183cc75b9 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Fri, 20 Dec 2024 18:25:03 +0300 Subject: fix: Pylint fixes. --- gn3/api/rqtl.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'gn3/api') diff --git a/gn3/api/rqtl.py b/gn3/api/rqtl.py index 39f45a3..8b3c536 100644 --- a/gn3/api/rqtl.py +++ b/gn3/api/rqtl.py @@ -29,7 +29,7 @@ run the rqtl_wrapper script and return the results as JSON run_id = request.args.get("id") with open(os.path.join(current_app.config.get("TMPDIR"), - f"{run_id}.txt"), "w+"): + f"{run_id}.txt"), "w+", encoding="utf-8"): pass # Split kwargs by those with values and boolean ones # that just convert to True/False @@ -68,7 +68,7 @@ run the rqtl_wrapper script and return the results as JSON stream_ouput_file = os.path.join(current_app.config.get("TMPDIR"), f"{run_id}.txt") - results = run_process(rqtl_cmd.get("rqtl_cmd"), stream_ouput_file, run_id) + run_process(rqtl_cmd.get("rqtl_cmd"), stream_ouput_file, run_id) if "pairscan" in rqtl_bool_kwargs: rqtl_output['results'] = process_rqtl_pairscan(rqtl_cmd.get('output_file'), genofile) @@ -76,8 +76,8 @@ run the rqtl_wrapper script and return the results as JSON rqtl_output['results'] = process_rqtl_mapping(rqtl_cmd.get('output_file')) if int(rqtl_kwargs['nperm']) > 0: - rqtl_output['perm_results'], rqtl_output['suggestive'], - rqtl_output['significant'] = process_perm_output(rqtl_cmd.get('output_file')) + # pylint: disable=C0301 + rqtl_output['perm_results'], rqtl_output['suggestive'], rqtl_output['significant'] = process_perm_output(rqtl_cmd.get('output_file')) return jsonify(rqtl_output) @@ -94,7 +94,7 @@ def run_process(rscript_cmd, output_file, run_id): for line in iter(process.stdout.readline, b""): # these allow endpoint stream to read the file since # no read and write file same tiem - with open(output_file, "a+") as file_handler: + with open(output_file, "a+", encoding="utf-8") as file_handler: file_handler.write(line.decode("utf-8")) process.stdout.close() process.wait() -- cgit 1.4.1