aboutsummaryrefslogtreecommitdiff
"""Module for generic files endpoints."""
import time
import random
import traceback
from pathlib import Path

from flask import request, jsonify, Blueprint, current_app as app

from .chunks import chunk_name, chunks_directory

files = Blueprint("files", __name__)

def target_file(fileid: str) -> Path:
    """Compute the full path for the target file."""
    return Path(app.config["UPLOAD_FOLDER"], fileid)


@files.route("/upload/resumable", methods=["GET"])
def resumable_upload_get():
    """Used for checking whether **ALL** chunks have been uploaded."""
    fileid = request.args.get("resumableIdentifier", type=str) or ""
    filename = request.args.get("resumableFilename", type=str) or ""
    chunk = request.args.get("resumableChunkNumber", type=int) or 0
    if not(fileid or filename or chunk):
        return jsonify({
            "message": "At least one required query parameter is missing.",
            "error": "BadRequest",
            "statuscode": 400
        }), 400

    # If the complete target file exists, return 200 for all chunks.
    _targetfile = target_file(fileid)
    if _targetfile.exists():
        return jsonify({
            "uploaded-file": _targetfile.name,
            "original-name": filename,
            "chunk": chunk,
            "message": "The complete file already exists.",
            "statuscode": 200
        }), 200

    if Path(chunks_directory(fileid),
            chunk_name(filename, chunk)).exists():
        return jsonify({
            "chunk": chunk,
            "message": f"Chunk {chunk} exists.",
            "statuscode": 200
        }), 200

    return jsonify({
            "message": f"Chunk {chunk} was not found.",
            "error": "NotFound",
            "statuscode": 404
        }), 404


def __merge_chunks__(targetfile: Path, chunkpaths: tuple[Path, ...]) -> Path:
    """Merge the chunks into a single file."""
    with open(targetfile, "ab") as _target:
        for chunkfile in chunkpaths:
            app.logger.error("Merging chunk: %s", chunkfile)
            with open(chunkfile, "rb") as _chunkdata:
                _target.write(_chunkdata.read())

            chunkfile.unlink() # Don't use `missing_ok=True` — chunk MUST exist
            # If chunk does't exist, it might indicate a race condition. Handle
            # that instead.
    return targetfile


@files.route("/upload/resumable", methods=["POST"])
def resumable_upload_post():
    """Do the actual chunks upload here."""
    _totalchunks = request.form.get("resumableTotalChunks", type=int) or 0
    _chunk = request.form.get("resumableChunkNumber", default=1, type=int)
    _uploadfilename = request.form.get(
        "resumableFilename", default="", type=str) or ""
    _fileid = request.form.get(
        "resumableIdentifier", default="", type=str) or ""
    _targetfile = target_file(_fileid)

    if _targetfile.exists():
        return jsonify({
            "uploaded-file": _targetfile.name,
            "original-name": _uploadfilename,
            "message": "File was uploaded successfully!",
            "statuscode": 200
        }), 200

    try:
        chunks_directory(_fileid).mkdir(exist_ok=True, parents=True)
        request.files["file"].save(Path(chunks_directory(_fileid),
                                        chunk_name(_uploadfilename, _chunk)))

        # Check whether upload is complete
        chunkpaths = tuple(
            Path(chunks_directory(_fileid), chunk_name(_uploadfilename, _achunk))
            for _achunk in range(1, _totalchunks+1))
        if all(_file.exists() for _file in chunkpaths):
            ### HACK: Break possible race condition ###
            # Looks like sometimes, there are multiple threads/requests trying
            # to merge one file, leading to race conditions and in some rare
            # instances, actual data corruption. This hack is meant to break
            # that race condition.
            _delays = (
                101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163,
                167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233,
                239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293)
            _lockfile = Path(chunks_directory(_fileid), "merge.lock")
            while True:
                time.sleep(random.choice(_delays) / 1000)
                if (chunks_directory(_fileid).exists()
                    and not (_lockfile.exists() and _targetfile.exists())):
                    # merge_files and clean up chunks
                    _lockfile.touch()
                    __merge_chunks__(_targetfile, chunkpaths)
                    _lockfile.unlink()
                    chunks_directory(_fileid).rmdir()
                    continue

                if (_targetfile.exists()
                    and not (
                        chunks_directory(_fileid).exists()
                        and _lockfile.exists())):
                    # merge complete
                    break

                # There is still a thread that's merging this file
                continue
            ### END: HACK: Break possible race condition ###

            if _targetfile.exists():
                return jsonify({
                    "uploaded-file": _targetfile.name,
                    "original-name": _uploadfilename,
                    "message": "File was uploaded successfully!",
                    "statuscode": 200
                }), 200
            return jsonify({
                "uploaded-file": _targetfile.name,
                "original-name": _uploadfilename,
                "message": "Uploaded file is missing!",
                "statuscode": 404
            }), 404
        return jsonify({
            "message": f"Chunk {int(_chunk)} uploaded successfully.",
            "statuscode": 201
        }), 201
    except Exception as exc:# pylint: disable=[broad-except]
        msg = "Error processing uploaded file chunks."
        app.logger.error(msg, exc_info=True, stack_info=True)
        return jsonify({
            "message": msg,
            "error": type(exc).__name__,
            "error-description": " ".join(str(arg) for arg in exc.args),
            "error-trace": traceback.format_exception(exc)
        }), 500