aboutsummaryrefslogtreecommitdiff
path: root/qc_app/upload/rqtl2.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/upload/rqtl2.py')
-rw-r--r--qc_app/upload/rqtl2.py160
1 files changed, 136 insertions, 24 deletions
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py
index e79f1db..4ff7ba3 100644
--- a/qc_app/upload/rqtl2.py
+++ b/qc_app/upload/rqtl2.py
@@ -12,6 +12,8 @@ from typing import Union, Callable, Optional
import MySQLdb as mdb
from redis import Redis
from MySQLdb.cursors import DictCursor
+from werkzeug.utils import secure_filename
+from werkzeug.exceptions import NotFound, BadRequest
from flask import (
flash,
escape,
@@ -178,30 +180,140 @@ def upload_rqtl2_bundle(species_id: int, population_id: int):
app.logger.debug("The file is not a zip file.")
raise __RequestError__("Invalid file! Expected a zip file.")
- redisuri = app.config["REDIS_URL"]
- with Redis.from_url(redisuri, decode_responses=True) as rconn:
- jobid = str(uuid4())
- redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
- jobs.launch_job(
- jobs.initialise_job(
- rconn,
- jobs.jobsnamespace(),
- jobid,
- [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle",
- app.config["SQL_URI"], app.config["REDIS_URL"],
- jobs.jobsnamespace(), jobid, "--redisexpiry",
- str(redis_ttl_seconds)],
- "rqtl2-bundle-qc-job",
- redis_ttl_seconds,
- {"job-metadata": json.dumps({
- "speciesid": species_id,
- "populationid": population_id,
- "rqtl2-bundle-file": str(the_file.absolute()),
- "original-filename": request.files["rqtl2_bundle_file"].filename})}),
- redisuri,
- f"{app.config['UPLOAD_FOLDER']}/job_errors")
- return redirect(url_for(
- "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid))
+ jobid = trigger_rqtl2_bundle_qc(
+ species_id,
+ population_id,
+ the_file,
+ request.files["rqtl2_bundle_file"].filename)#type: ignore[arg-type]
+ return redirect(url_for(
+ "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid))
+
+
+def trigger_rqtl2_bundle_qc(
+ species_id: int,
+ population_id: int,
+ rqtl2bundle: Path,
+ originalfilename: str
+) -> UUID:
+ """Trigger QC on the R/qtl2 bundle."""
+ redisuri = app.config["REDIS_URL"]
+ with Redis.from_url(redisuri, decode_responses=True) as rconn:
+ jobid = uuid4()
+ redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
+ jobs.launch_job(
+ jobs.initialise_job(
+ rconn,
+ jobs.jobsnamespace(),
+ str(jobid),
+ [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle",
+ app.config["SQL_URI"], app.config["REDIS_URL"],
+ jobs.jobsnamespace(), str(jobid), "--redisexpiry",
+ str(redis_ttl_seconds)],
+ "rqtl2-bundle-qc-job",
+ redis_ttl_seconds,
+ {"job-metadata": json.dumps({
+ "speciesid": species_id,
+ "populationid": population_id,
+ "rqtl2-bundle-file": str(rqtl2bundle.absolute()),
+ "original-filename": originalfilename})}),
+ redisuri,
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+ return jobid
+
+
+def chunk_name(uploadfilename: str, chunkno: int) -> str:
+ """Generate chunk name from original filename and chunk number"""
+ if uploadfilename == "":
+ raise ValueError("Name cannot be empty!")
+ if chunkno < 1:
+ raise ValueError("Chunk number must be greater than zero")
+ return f"{secure_filename(uploadfilename)}_part_{chunkno:05d}"
+
+
+def chunks_directory(uniqueidentifier: str) -> Path:
+ """Compute the directory where chunks are temporarily stored."""
+ if uniqueidentifier == "":
+ raise ValueError("Unique identifier cannot be empty!")
+ return Path(app.config["UPLOAD_FOLDER"], f"tempdir_{uniqueidentifier}")
+
+
+@rqtl2.route(("/upload/species/<int:species_id>/population/<int:population_id>"
+ "/rqtl2-bundle-chunked"),
+ methods=["GET"])
+def upload_rqtl2_bundle_chunked_get(# pylint: disable=["unused-argument"]
+ species_id: int,
+ population_id: int
+):
+ """
+ Extension to the `upload_rqtl2_bundle` endpoint above that provides a way
+ for testing whether all the chunks have been uploaded and to assist with
+ resuming a failed upload.
+ """
+ fileid = request.args.get("resumableIdentifier", type=str) or ""
+ filename = request.args.get("resumableFilename", type=str) or ""
+ chunk = request.args.get("resumableChunkNumber", type=int) or 0
+ if not(fileid or filename or chunk):
+ raise BadRequest("At least one required query parameter is missing.")
+
+ if Path(chunks_directory(fileid),
+ chunk_name(filename, chunk)).exists():
+ return "OK"
+
+ raise NotFound(description=f"Chunk {chunk} was not found.")
+
+
+def __merge_chunks__(targetfile: Path, chunkpaths: tuple[Path, ...]) -> Path:
+ """Merge the chunks into a single file."""
+ with open(targetfile, "ab") as _target:
+ for chunkfile in chunkpaths:
+ with open(chunkfile, "rb") as _chunkdata:
+ _target.write(_chunkdata.read())
+
+ chunkfile.unlink()
+ return targetfile
+
+
+@rqtl2.route(("/upload/species/<int:species_id>/population/<int:population_id>"
+ "/rqtl2-bundle-chunked"),
+ methods=["POST"])
+def upload_rqtl2_bundle_chunked_post(species_id: int, population_id: int):
+ """
+ Extension to the `upload_rqtl2_bundle` endpoint above that allows large
+ files to be uploaded in chunks.
+
+ This should hopefully speed up uploads, and if done right, even enable
+ resumable uploads
+ """
+ _totalchunks = request.form.get("resumableTotalChunks", type=int) or 0
+ _chunk = request.form.get("resumableChunkNumber", default=1, type=int)
+ _uploadfilename = request.form.get(
+ "resumableFilename", default="", type=str) or ""
+ _fileid = request.form.get(
+ "resumableIdentifier", default="", type=str) or ""
+ _targetfile = Path(app.config["UPLOAD_FOLDER"], _fileid)
+
+ if _targetfile.exists():
+ raise BadRequest("The file has already been uploaded.")
+
+ # save chunk data
+ chunks_directory(_fileid).mkdir(exist_ok=True)
+ request.files["file"].save(Path(chunks_directory(_fileid),
+ chunk_name(_uploadfilename, _chunk)))
+
+ # Check whether upload is complete
+ chunkpaths = tuple(
+ Path(chunks_directory(_fileid), chunk_name(_uploadfilename, _achunk))
+ for _achunk in range(1, _totalchunks+1))
+ if all(_file.exists() for _file in chunkpaths):
+ # merge_files and clean up chunks
+ __merge_chunks__(_targetfile, chunkpaths)
+ chunks_directory(_fileid).rmdir()
+ jobid = trigger_rqtl2_bundle_qc(
+ species_id, population_id, _targetfile, _uploadfilename)
+ return url_for(
+ "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid)
+
+ return "OK"
@rqtl2.route("/upload/species/rqtl2-bundle/qc-status/<uuid:jobid>",