aboutsummaryrefslogtreecommitdiff
path: root/qc_app/upload
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-06-27 16:40:52 -0500
committerFrederick Muriuki Muriithi2024-06-27 16:40:52 -0500
commit47c2ea64682064d7cb609e5459d7bd2e49efa17e (patch)
treeb067922e2609188cdc00319aa9883210e656891f /qc_app/upload
parent59b345294cda9cf25b20ae7bfd617f62655ad6da (diff)
downloadgn-uploader-47c2ea64682064d7cb609e5459d7bd2e49efa17e.tar.gz
Handle chunks on backend and update UI with progress
Handle the uploaded chunks, enabling resumption of the upload if incomplete. Update the UI with the progress of the upload.
Diffstat (limited to 'qc_app/upload')
-rw-r--r--qc_app/upload/rqtl2.py160
1 files changed, 136 insertions, 24 deletions
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py
index e79f1db..4ff7ba3 100644
--- a/qc_app/upload/rqtl2.py
+++ b/qc_app/upload/rqtl2.py
@@ -12,6 +12,8 @@ from typing import Union, Callable, Optional
import MySQLdb as mdb
from redis import Redis
from MySQLdb.cursors import DictCursor
+from werkzeug.utils import secure_filename
+from werkzeug.exceptions import NotFound, BadRequest
from flask import (
flash,
escape,
@@ -178,30 +180,140 @@ def upload_rqtl2_bundle(species_id: int, population_id: int):
app.logger.debug("The file is not a zip file.")
raise __RequestError__("Invalid file! Expected a zip file.")
- redisuri = app.config["REDIS_URL"]
- with Redis.from_url(redisuri, decode_responses=True) as rconn:
- jobid = str(uuid4())
- redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
- jobs.launch_job(
- jobs.initialise_job(
- rconn,
- jobs.jobsnamespace(),
- jobid,
- [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle",
- app.config["SQL_URI"], app.config["REDIS_URL"],
- jobs.jobsnamespace(), jobid, "--redisexpiry",
- str(redis_ttl_seconds)],
- "rqtl2-bundle-qc-job",
- redis_ttl_seconds,
- {"job-metadata": json.dumps({
- "speciesid": species_id,
- "populationid": population_id,
- "rqtl2-bundle-file": str(the_file.absolute()),
- "original-filename": request.files["rqtl2_bundle_file"].filename})}),
- redisuri,
- f"{app.config['UPLOAD_FOLDER']}/job_errors")
- return redirect(url_for(
- "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid))
+ jobid = trigger_rqtl2_bundle_qc(
+ species_id,
+ population_id,
+ the_file,
+ request.files["rqtl2_bundle_file"].filename)#type: ignore[arg-type]
+ return redirect(url_for(
+ "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid))
+
+
+def trigger_rqtl2_bundle_qc(
+ species_id: int,
+ population_id: int,
+ rqtl2bundle: Path,
+ originalfilename: str
+) -> UUID:
+ """Trigger QC on the R/qtl2 bundle."""
+ redisuri = app.config["REDIS_URL"]
+ with Redis.from_url(redisuri, decode_responses=True) as rconn:
+ jobid = uuid4()
+ redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
+ jobs.launch_job(
+ jobs.initialise_job(
+ rconn,
+ jobs.jobsnamespace(),
+ str(jobid),
+ [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle",
+ app.config["SQL_URI"], app.config["REDIS_URL"],
+ jobs.jobsnamespace(), str(jobid), "--redisexpiry",
+ str(redis_ttl_seconds)],
+ "rqtl2-bundle-qc-job",
+ redis_ttl_seconds,
+ {"job-metadata": json.dumps({
+ "speciesid": species_id,
+ "populationid": population_id,
+ "rqtl2-bundle-file": str(rqtl2bundle.absolute()),
+ "original-filename": originalfilename})}),
+ redisuri,
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ return jobid
+
+
+def chunk_name(uploadfilename: str, chunkno: int) -> str:
+ """Generate chunk name from original filename and chunk number"""
+ if uploadfilename == "":
+ raise ValueError("Name cannot be empty!")
+ if chunkno < 1:
+ raise ValueError("Chunk number must be greater than zero")
+ return f"{secure_filename(uploadfilename)}_part_{chunkno:05d}"
+
+
+def chunks_directory(uniqueidentifier: str) -> Path:
+ """Compute the directory where chunks are temporarily stored."""
+ if uniqueidentifier == "":
+ raise ValueError("Unique identifier cannot be empty!")
+ return Path(app.config["UPLOAD_FOLDER"], f"tempdir_{uniqueidentifier}")
+
+
+@rqtl2.route(("/upload/species/<int:species_id>/population/<int:population_id>"
+ "/rqtl2-bundle-chunked"),
+ methods=["GET"])
+def upload_rqtl2_bundle_chunked_get(# pylint: disable=["unused-argument"]
+ species_id: int,
+ population_id: int
+):
+ """
+ Extension to the `upload_rqtl2_bundle` endpoint above that provides a way
+ for testing whether all the chunks have been uploaded and to assist with
+ resuming a failed upload.
+ """
+ fileid = request.args.get("resumableIdentifier", type=str) or ""
+ filename = request.args.get("resumableFilename", type=str) or ""
+ chunk = request.args.get("resumableChunkNumber", type=int) or 0
+ if not(fileid or filename or chunk):
+ raise BadRequest("At least one required query parameter is missing.")
+
+ if Path(chunks_directory(fileid),
+ chunk_name(filename, chunk)).exists():
+ return "OK"
+
+ raise NotFound(description=f"Chunk {chunk} was not found.")
+
+
+def __merge_chunks__(targetfile: Path, chunkpaths: tuple[Path, ...]) -> Path:
+ """Merge the chunks into a single file."""
+ with open(targetfile, "ab") as _target:
+ for chunkfile in chunkpaths:
+ with open(chunkfile, "rb") as _chunkdata:
+ _target.write(_chunkdata.read())
+
+ chunkfile.unlink()
+ return targetfile
+
+
+@rqtl2.route(("/upload/species/<int:species_id>/population/<int:population_id>"
+ "/rqtl2-bundle-chunked"),
+ methods=["POST"])
+def upload_rqtl2_bundle_chunked_post(species_id: int, population_id: int):
+ """
+ Extension to the `upload_rqtl2_bundle` endpoint above that allows large
+ files to be uploaded in chunks.
+
+ This should hopefully speed up uploads, and if done right, even enable
+ resumable uploads
+ """
+ _totalchunks = request.form.get("resumableTotalChunks", type=int) or 0
+ _chunk = request.form.get("resumableChunkNumber", default=1, type=int)
+ _uploadfilename = request.form.get(
+ "resumableFilename", default="", type=str) or ""
+ _fileid = request.form.get(
+ "resumableIdentifier", default="", type=str) or ""
+ _targetfile = Path(app.config["UPLOAD_FOLDER"], _fileid)
+
+ if _targetfile.exists():
+ raise BadRequest("The file has already been uploaded.")
+
+ # save chunk data
+ chunks_directory(_fileid).mkdir(exist_ok=True)
+ request.files["file"].save(Path(chunks_directory(_fileid),
+ chunk_name(_uploadfilename, _chunk)))
+
+ # Check whether upload is complete
+ chunkpaths = tuple(
+ Path(chunks_directory(_fileid), chunk_name(_uploadfilename, _achunk))
+ for _achunk in range(1, _totalchunks+1))
+ if all(_file.exists() for _file in chunkpaths):
+ # merge_files and clean up chunks
+ __merge_chunks__(_targetfile, chunkpaths)
+ chunks_directory(_fileid).rmdir()
+ jobid = trigger_rqtl2_bundle_qc(
+ species_id, population_id, _targetfile, _uploadfilename)
+ return url_for(
+ "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid)
+
+ return "OK"
@rqtl2.route("/upload/species/rqtl2-bundle/qc-status/<uuid:jobid>",