From 47c2ea64682064d7cb609e5459d7bd2e49efa17e Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 27 Jun 2024 16:40:52 -0500 Subject: Handle chunks on backend and update UI with progress Handle the uploaded chunks, enabling resumption of the upload if incomplete. Update the UI with the progress of the upload. --- qc_app/upload/rqtl2.py | 160 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 136 insertions(+), 24 deletions(-) (limited to 'qc_app/upload/rqtl2.py') diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py index e79f1db..4ff7ba3 100644 --- a/qc_app/upload/rqtl2.py +++ b/qc_app/upload/rqtl2.py @@ -12,6 +12,8 @@ from typing import Union, Callable, Optional import MySQLdb as mdb from redis import Redis from MySQLdb.cursors import DictCursor +from werkzeug.utils import secure_filename +from werkzeug.exceptions import NotFound, BadRequest from flask import ( flash, escape, @@ -178,30 +180,140 @@ def upload_rqtl2_bundle(species_id: int, population_id: int): app.logger.debug("The file is not a zip file.") raise __RequestError__("Invalid file! Expected a zip file.") - redisuri = app.config["REDIS_URL"] - with Redis.from_url(redisuri, decode_responses=True) as rconn: - jobid = str(uuid4()) - redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"] - jobs.launch_job( - jobs.initialise_job( - rconn, - jobs.jobsnamespace(), - jobid, - [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle", - app.config["SQL_URI"], app.config["REDIS_URL"], - jobs.jobsnamespace(), jobid, "--redisexpiry", - str(redis_ttl_seconds)], - "rqtl2-bundle-qc-job", - redis_ttl_seconds, - {"job-metadata": json.dumps({ - "speciesid": species_id, - "populationid": population_id, - "rqtl2-bundle-file": str(the_file.absolute()), - "original-filename": request.files["rqtl2_bundle_file"].filename})}), - redisuri, - f"{app.config['UPLOAD_FOLDER']}/job_errors") - return redirect(url_for( - "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid)) + jobid = trigger_rqtl2_bundle_qc( + species_id, + population_id, + the_file, + request.files["rqtl2_bundle_file"].filename)#type: ignore[arg-type] + return redirect(url_for( + "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid)) + + +def trigger_rqtl2_bundle_qc( + species_id: int, + population_id: int, + rqtl2bundle: Path, + originalfilename: str +) -> UUID: + """Trigger QC on the R/qtl2 bundle.""" + redisuri = app.config["REDIS_URL"] + with Redis.from_url(redisuri, decode_responses=True) as rconn: + jobid = uuid4() + redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"] + jobs.launch_job( + jobs.initialise_job( + rconn, + jobs.jobsnamespace(), + str(jobid), + [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle", + app.config["SQL_URI"], app.config["REDIS_URL"], + jobs.jobsnamespace(), str(jobid), "--redisexpiry", + str(redis_ttl_seconds)], + "rqtl2-bundle-qc-job", + redis_ttl_seconds, + {"job-metadata": json.dumps({ + "speciesid": species_id, + "populationid": population_id, + "rqtl2-bundle-file": str(rqtl2bundle.absolute()), + "original-filename": originalfilename})}), + redisuri, + f"{app.config['UPLOAD_FOLDER']}/job_errors") + return jobid + + +def chunk_name(uploadfilename: str, chunkno: int) -> str: + """Generate chunk name from original filename and chunk number""" + if uploadfilename == "": + raise ValueError("Name cannot be empty!") + if chunkno < 1: + raise ValueError("Chunk number must be greater than zero") + return f"{secure_filename(uploadfilename)}_part_{chunkno:05d}" + + +def chunks_directory(uniqueidentifier: str) -> Path: + """Compute the directory where chunks are temporarily stored.""" + if uniqueidentifier == "": + raise ValueError("Unique identifier cannot be empty!") + return Path(app.config["UPLOAD_FOLDER"], f"tempdir_{uniqueidentifier}") + + +@rqtl2.route(("/upload/species//population/" + "/rqtl2-bundle-chunked"), + methods=["GET"]) +def upload_rqtl2_bundle_chunked_get(# pylint: disable=["unused-argument"] + species_id: int, + population_id: int +): + """ + Extension to the `upload_rqtl2_bundle` endpoint above that provides a way + for testing whether all the chunks have been uploaded and to assist with + resuming a failed upload. + """ + fileid = request.args.get("resumableIdentifier", type=str) or "" + filename = request.args.get("resumableFilename", type=str) or "" + chunk = request.args.get("resumableChunkNumber", type=int) or 0 + if not(fileid or filename or chunk): + raise BadRequest("At least one required query parameter is missing.") + + if Path(chunks_directory(fileid), + chunk_name(filename, chunk)).exists(): + return "OK" + + raise NotFound(description=f"Chunk {chunk} was not found.") + + +def __merge_chunks__(targetfile: Path, chunkpaths: tuple[Path, ...]) -> Path: + """Merge the chunks into a single file.""" + with open(targetfile, "ab") as _target: + for chunkfile in chunkpaths: + with open(chunkfile, "rb") as _chunkdata: + _target.write(_chunkdata.read()) + + chunkfile.unlink() + return targetfile + + +@rqtl2.route(("/upload/species//population/" + "/rqtl2-bundle-chunked"), + methods=["POST"]) +def upload_rqtl2_bundle_chunked_post(species_id: int, population_id: int): + """ + Extension to the `upload_rqtl2_bundle` endpoint above that allows large + files to be uploaded in chunks. + + This should hopefully speed up uploads, and if done right, even enable + resumable uploads + """ + _totalchunks = request.form.get("resumableTotalChunks", type=int) or 0 + _chunk = request.form.get("resumableChunkNumber", default=1, type=int) + _uploadfilename = request.form.get( + "resumableFilename", default="", type=str) or "" + _fileid = request.form.get( + "resumableIdentifier", default="", type=str) or "" + _targetfile = Path(app.config["UPLOAD_FOLDER"], _fileid) + + if _targetfile.exists(): + raise BadRequest("The file has already been uploaded.") + + # save chunk data + chunks_directory(_fileid).mkdir(exist_ok=True) + request.files["file"].save(Path(chunks_directory(_fileid), + chunk_name(_uploadfilename, _chunk))) + + # Check whether upload is complete + chunkpaths = tuple( + Path(chunks_directory(_fileid), chunk_name(_uploadfilename, _achunk)) + for _achunk in range(1, _totalchunks+1)) + if all(_file.exists() for _file in chunkpaths): + # merge_files and clean up chunks + __merge_chunks__(_targetfile, chunkpaths) + chunks_directory(_fileid).rmdir() + jobid = trigger_rqtl2_bundle_qc( + species_id, population_id, _targetfile, _uploadfilename) + return url_for( + "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid) + + return "OK" @rqtl2.route("/upload/species/rqtl2-bundle/qc-status/", -- cgit v1.2.3