about summary refs log tree commit diff
path: root/qc_app/upload
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/upload')
-rw-r--r--qc_app/upload/rqtl2.py160
1 files changed, 136 insertions, 24 deletions
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py
index e79f1db..4ff7ba3 100644
--- a/qc_app/upload/rqtl2.py
+++ b/qc_app/upload/rqtl2.py
@@ -12,6 +12,8 @@ from typing import Union, Callable, Optional
 import MySQLdb as mdb
 from redis import Redis
 from MySQLdb.cursors import DictCursor
+from werkzeug.utils import secure_filename
+from werkzeug.exceptions import NotFound, BadRequest
 from flask import (
     flash,
     escape,
@@ -178,30 +180,140 @@ def upload_rqtl2_bundle(species_id: int, population_id: int):
             app.logger.debug("The file is not a zip file.")
             raise __RequestError__("Invalid file! Expected a zip file.")
 
-        redisuri = app.config["REDIS_URL"]
-        with Redis.from_url(redisuri, decode_responses=True) as rconn:
-            jobid = str(uuid4())
-            redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
-            jobs.launch_job(
-                jobs.initialise_job(
-                    rconn,
-                    jobs.jobsnamespace(),
-                    jobid,
-                    [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle",
-                     app.config["SQL_URI"], app.config["REDIS_URL"],
-                     jobs.jobsnamespace(), jobid, "--redisexpiry",
-                     str(redis_ttl_seconds)],
-                    "rqtl2-bundle-qc-job",
-                    redis_ttl_seconds,
-                    {"job-metadata": json.dumps({
-                        "speciesid": species_id,
-                        "populationid": population_id,
-                        "rqtl2-bundle-file": str(the_file.absolute()),
-                        "original-filename": request.files["rqtl2_bundle_file"].filename})}),
-                redisuri,
-                f"{app.config['UPLOAD_FOLDER']}/job_errors")
-            return redirect(url_for(
-                "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid))
+        jobid = trigger_rqtl2_bundle_qc(
+            species_id,
+            population_id,
+            the_file,
+            request.files["rqtl2_bundle_file"].filename)#type: ignore[arg-type]
+        return redirect(url_for(
+            "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid))
+
+
+def trigger_rqtl2_bundle_qc(
+        species_id: int,
+        population_id: int,
+        rqtl2bundle: Path,
+        originalfilename: str
+) -> UUID:
+    """Trigger QC on the R/qtl2 bundle."""
+    redisuri = app.config["REDIS_URL"]
+    with Redis.from_url(redisuri, decode_responses=True) as rconn:
+        jobid = uuid4()
+        redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
+        jobs.launch_job(
+            jobs.initialise_job(
+                rconn,
+                jobs.jobsnamespace(),
+                str(jobid),
+                [sys.executable, "-m", "scripts.qc_on_rqtl2_bundle",
+                 app.config["SQL_URI"], app.config["REDIS_URL"],
+                 jobs.jobsnamespace(), str(jobid), "--redisexpiry",
+                 str(redis_ttl_seconds)],
+                "rqtl2-bundle-qc-job",
+                redis_ttl_seconds,
+                {"job-metadata": json.dumps({
+                    "speciesid": species_id,
+                    "populationid": population_id,
+                    "rqtl2-bundle-file": str(rqtl2bundle.absolute()),
+                    "original-filename": originalfilename})}),
+            redisuri,
+            f"{app.config['UPLOAD_FOLDER']}/job_errors")
+        return jobid
+
+
+def chunk_name(uploadfilename: str, chunkno: int) -> str:
+    """Generate chunk name from original filename and chunk number"""
+    if uploadfilename == "":
+        raise ValueError("Name cannot be empty!")
+    if chunkno < 1:
+        raise ValueError("Chunk number must be greater than zero")
+    return f"{secure_filename(uploadfilename)}_part_{chunkno:05d}"
+
+
+def chunks_directory(uniqueidentifier: str) -> Path:
+    """Compute the directory where chunks are temporarily stored."""
+    if uniqueidentifier == "":
+        raise ValueError("Unique identifier cannot be empty!")
+    return Path(app.config["UPLOAD_FOLDER"], f"tempdir_{uniqueidentifier}")
+
+
+@rqtl2.route(("/upload/species/<int:species_id>/population/<int:population_id>"
+              "/rqtl2-bundle-chunked"),
+             methods=["GET"])
+def upload_rqtl2_bundle_chunked_get(# pylint: disable=["unused-argument"]
+        species_id: int,
+        population_id: int
+):
+    """
+    Extension to the `upload_rqtl2_bundle` endpoint above that provides a way
+    for testing whether all the chunks have been uploaded and to assist with
+    resuming a failed upload.
+    """
+    fileid = request.args.get("resumableIdentifier", type=str) or ""
+    filename = request.args.get("resumableFilename", type=str) or ""
+    chunk = request.args.get("resumableChunkNumber", type=int) or 0
+    if not(fileid or filename or chunk):
+        raise BadRequest("At least one required query parameter is missing.")
+
+    if Path(chunks_directory(fileid),
+            chunk_name(filename, chunk)).exists():
+        return "OK"
+
+    raise NotFound(description=f"Chunk {chunk} was not found.")
+
+
+def __merge_chunks__(targetfile: Path, chunkpaths: tuple[Path, ...]) -> Path:
+    """Merge the chunks into a single file."""
+    with open(targetfile, "ab") as _target:
+        for chunkfile in chunkpaths:
+            with open(chunkfile, "rb") as _chunkdata:
+                _target.write(_chunkdata.read())
+
+            chunkfile.unlink()
+    return targetfile
+
+
+@rqtl2.route(("/upload/species/<int:species_id>/population/<int:population_id>"
+              "/rqtl2-bundle-chunked"),
+             methods=["POST"])
+def upload_rqtl2_bundle_chunked_post(species_id: int, population_id: int):
+    """
+    Extension to the `upload_rqtl2_bundle` endpoint above that allows large
+    files to be uploaded in chunks.
+
+    This should hopefully speed up uploads, and if done right, even enable
+    resumable uploads
+    """
+    _totalchunks = request.form.get("resumableTotalChunks", type=int) or 0
+    _chunk = request.form.get("resumableChunkNumber", default=1, type=int)
+    _uploadfilename = request.form.get(
+        "resumableFilename", default="", type=str) or ""
+    _fileid = request.form.get(
+        "resumableIdentifier", default="", type=str) or ""
+    _targetfile = Path(app.config["UPLOAD_FOLDER"], _fileid)
+
+    if _targetfile.exists():
+        raise BadRequest("The file has already been uploaded.")
+
+    # save chunk data
+    chunks_directory(_fileid).mkdir(exist_ok=True)
+    request.files["file"].save(Path(chunks_directory(_fileid),
+                                    chunk_name(_uploadfilename, _chunk)))
+
+    # Check whether upload is complete
+    chunkpaths = tuple(
+        Path(chunks_directory(_fileid), chunk_name(_uploadfilename, _achunk))
+        for _achunk in range(1, _totalchunks+1))
+    if all(_file.exists() for _file in chunkpaths):
+        # merge_files and clean up chunks
+        __merge_chunks__(_targetfile, chunkpaths)
+        chunks_directory(_fileid).rmdir()
+        jobid = trigger_rqtl2_bundle_qc(
+            species_id, population_id, _targetfile, _uploadfilename)
+        return url_for(
+            "upload.rqtl2.rqtl2_bundle_qc_status", jobid=jobid)
+
+    return "OK"
 
 
 @rqtl2.route("/upload/species/rqtl2-bundle/qc-status/<uuid:jobid>",