about summary refs log tree commit diff
path: root/uploader/files
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/files')
-rw-r--r--uploader/files/__init__.py5
-rw-r--r--uploader/files/chunks.py32
-rw-r--r--uploader/files/functions.py42
-rw-r--r--uploader/files/views.py157
4 files changed, 236 insertions, 0 deletions
diff --git a/uploader/files/__init__.py b/uploader/files/__init__.py
new file mode 100644
index 0000000..53c3176
--- /dev/null
+++ b/uploader/files/__init__.py
@@ -0,0 +1,5 @@
+"""General files and chunks utilities."""
+from .chunks import chunked_binary_read
+from .functions import (fullpath,
+                        save_file,
+                        sha256_digest_over_file)
diff --git a/uploader/files/chunks.py b/uploader/files/chunks.py
new file mode 100644
index 0000000..c4360b5
--- /dev/null
+++ b/uploader/files/chunks.py
@@ -0,0 +1,32 @@
+"""Functions dealing with chunking of files."""
+from pathlib import Path
+from typing import Iterator
+
+from flask import current_app as app
+from werkzeug.utils import secure_filename
+
+
+def chunked_binary_read(filepath: Path, chunksize: int = 2048) -> Iterator:
+    """Read a file in binary mode in chunks."""
+    with open(filepath, "rb") as inputfile:
+        while True:
+            data = inputfile.read(chunksize)
+            if data != b"":
+                yield data
+                continue
+            break
+
+def chunk_name(uploadfilename: str, chunkno: int) -> str:
+    """Generate chunk name from original filename and chunk number"""
+    if uploadfilename == "":
+        raise ValueError("Name cannot be empty!")
+    if chunkno < 1:
+        raise ValueError("Chunk number must be greater than zero")
+    return f"{secure_filename(uploadfilename)}_part_{chunkno:05d}"
+
+
+def chunks_directory(uniqueidentifier: str) -> Path:
+    """Compute the directory where chunks are temporarily stored."""
+    if uniqueidentifier == "":
+        raise ValueError("Unique identifier cannot be empty!")
+    return Path(app.config["UPLOAD_FOLDER"], f"tempdir_{uniqueidentifier}")
diff --git a/uploader/files/functions.py b/uploader/files/functions.py
new file mode 100644
index 0000000..7b9f06b
--- /dev/null
+++ b/uploader/files/functions.py
@@ -0,0 +1,42 @@
+"""Utilities to deal with uploaded files."""
+import hashlib
+from pathlib import Path
+from datetime import datetime
+
+from flask import current_app
+
+from werkzeug.utils import secure_filename
+from werkzeug.datastructures import FileStorage
+
+from .chunks import chunked_binary_read
+
+def save_file(fileobj: FileStorage, upload_dir: Path, hashed: bool = True) -> Path:
+    """Save the uploaded file and return the path."""
+    assert bool(fileobj), "Invalid file object!"
+    hashed_name = (
+        hashlib.sha512(
+            f"{fileobj.filename}::{datetime.now().isoformat()}".encode("utf8")
+        ).hexdigest()
+        if hashed else
+        fileobj.filename)
+    filename = Path(secure_filename(hashed_name)) # type: ignore[arg-type]
+    if not upload_dir.exists():
+        upload_dir.mkdir()
+
+    filepath = Path(upload_dir, filename)
+    fileobj.save(filepath)
+    return filepath
+
+
+def fullpath(filename: str):
+    """Get a file's full path. This makes use of `flask.current_app`."""
+    return Path(current_app.config["UPLOAD_FOLDER"], filename).absolute()
+
+
+def sha256_digest_over_file(filepath: Path) -> str:
+    """Compute the sha256 digest over a file's contents."""
+    filehash = hashlib.sha256()
+    for chunk in chunked_binary_read(filepath):
+        filehash.update(chunk)
+
+    return filehash.hexdigest()
diff --git a/uploader/files/views.py b/uploader/files/views.py
new file mode 100644
index 0000000..29059c7
--- /dev/null
+++ b/uploader/files/views.py
@@ -0,0 +1,157 @@
+"""Module for generic files endpoints."""
+import time
+import random
+import traceback
+from pathlib import Path
+
+from flask import request, jsonify, Blueprint, current_app as app
+
+from .chunks import chunk_name, chunks_directory
+
+files = Blueprint("files", __name__)
+
+def target_file(fileid: str) -> Path:
+    """Compute the full path for the target file."""
+    return Path(app.config["UPLOAD_FOLDER"], fileid)
+
+
+@files.route("/upload/resumable", methods=["GET"])
+def resumable_upload_get():
+    """Used for checking whether **ALL** chunks have been uploaded."""
+    fileid = request.args.get("resumableIdentifier", type=str) or ""
+    filename = request.args.get("resumableFilename", type=str) or ""
+    chunk = request.args.get("resumableChunkNumber", type=int) or 0
+    if not(fileid or filename or chunk):
+        return jsonify({
+            "message": "At least one required query parameter is missing.",
+            "error": "BadRequest",
+            "statuscode": 400
+        }), 400
+
+    # If the complete target file exists, return 200 for all chunks.
+    _targetfile = target_file(fileid)
+    if _targetfile.exists():
+        return jsonify({
+            "uploaded-file": _targetfile.name,
+            "original-name": filename,
+            "chunk": chunk,
+            "message": "The complete file already exists.",
+            "statuscode": 200
+        }), 200
+
+    if Path(chunks_directory(fileid),
+            chunk_name(filename, chunk)).exists():
+        return jsonify({
+            "chunk": chunk,
+            "message": f"Chunk {chunk} exists.",
+            "statuscode": 200
+        }), 200
+
+    return jsonify({
+            "message": f"Chunk {chunk} was not found.",
+            "error": "NotFound",
+            "statuscode": 404
+        }), 404
+
+
+def __merge_chunks__(targetfile: Path, chunkpaths: tuple[Path, ...]) -> Path:
+    """Merge the chunks into a single file."""
+    with open(targetfile, "ab") as _target:
+        for chunkfile in chunkpaths:
+            app.logger.error("Merging chunk: %s", chunkfile)
+            with open(chunkfile, "rb") as _chunkdata:
+                _target.write(_chunkdata.read())
+
+            chunkfile.unlink() # Don't use `missing_ok=True` — chunk MUST exist
+            # If chunk does't exist, it might indicate a race condition. Handle
+            # that instead.
+    return targetfile
+
+
+@files.route("/upload/resumable", methods=["POST"])
+def resumable_upload_post():
+    """Do the actual chunks upload here."""
+    _totalchunks = request.form.get("resumableTotalChunks", type=int) or 0
+    _chunk = request.form.get("resumableChunkNumber", default=1, type=int)
+    _uploadfilename = request.form.get(
+        "resumableFilename", default="", type=str) or ""
+    _fileid = request.form.get(
+        "resumableIdentifier", default="", type=str) or ""
+    _targetfile = target_file(_fileid)
+
+    if _targetfile.exists():
+        return jsonify({
+            "uploaded-file": _targetfile.name,
+            "original-name": _uploadfilename,
+            "message": "File was uploaded successfully!",
+            "statuscode": 200
+        }), 200
+
+    try:
+        chunks_directory(_fileid).mkdir(exist_ok=True, parents=True)
+        request.files["file"].save(Path(chunks_directory(_fileid),
+                                        chunk_name(_uploadfilename, _chunk)))
+
+        # Check whether upload is complete
+        chunkpaths = tuple(
+            Path(chunks_directory(_fileid), chunk_name(_uploadfilename, _achunk))
+            for _achunk in range(1, _totalchunks+1))
+        if all(_file.exists() for _file in chunkpaths):
+            ### HACK: Break possible race condition ###
+            # Looks like sometimes, there are multiple threads/requests trying
+            # to merge one file, leading to race conditions and in some rare
+            # instances, actual data corruption. This hack is meant to break
+            # that race condition.
+            _delays = (
+                101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163,
+                167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233,
+                239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293)
+            _lockfile = Path(chunks_directory(_fileid), "merge.lock")
+            while True:
+                time.sleep(random.choice(_delays) / 1000)
+                if (chunks_directory(_fileid).exists()
+                    and not (_lockfile.exists() and _targetfile.exists())):
+                    # merge_files and clean up chunks
+                    _lockfile.touch()
+                    __merge_chunks__(_targetfile, chunkpaths)
+                    _lockfile.unlink()
+                    chunks_directory(_fileid).rmdir()
+                    continue
+
+                if (_targetfile.exists()
+                    and not (
+                        chunks_directory(_fileid).exists()
+                        and _lockfile.exists())):
+                    # merge complete
+                    break
+
+                # There is still a thread that's merging this file
+                continue
+            ### END: HACK: Break possible race condition ###
+
+            if _targetfile.exists():
+                return jsonify({
+                    "uploaded-file": _targetfile.name,
+                    "original-name": _uploadfilename,
+                    "message": "File was uploaded successfully!",
+                    "statuscode": 200
+                }), 200
+            return jsonify({
+                "uploaded-file": _targetfile.name,
+                "original-name": _uploadfilename,
+                "message": "Uploaded file is missing!",
+                "statuscode": 404
+            }), 404
+        return jsonify({
+            "message": f"Chunk {int(_chunk)} uploaded successfully.",
+            "statuscode": 201
+        }), 201
+    except Exception as exc:# pylint: disable=[broad-except]
+        msg = "Error processing uploaded file chunks."
+        app.logger.error(msg, exc_info=True, stack_info=True)
+        return jsonify({
+            "message": msg,
+            "error": type(exc).__name__,
+            "error-description": " ".join(str(arg) for arg in exc.args),
+            "error-trace": traceback.format_exception(exc)
+        }), 500