diff options
author | Frederick Muriuki Muriithi | 2024-08-28 17:12:26 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2024-08-28 17:54:17 -0500 |
commit | 06c6a7f7f42e8ff2d33a934ff695efde24d26d65 (patch) | |
tree | 0ab6115fa7a8ee490cc6efc343c44549c1871281 /uploader/expression_data/index.py | |
parent | 05191fa146fac31fd079c50bf6bcc4983f2f0792 (diff) | |
download | gn-uploader-06c6a7f7f42e8ff2d33a934ff695efde24d26d65.tar.gz |
Move code handling expression data upload into new module.
Diffstat (limited to 'uploader/expression_data/index.py')
-rw-r--r-- | uploader/expression_data/index.py | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/uploader/expression_data/index.py b/uploader/expression_data/index.py new file mode 100644 index 0000000..a334c51 --- /dev/null +++ b/uploader/expression_data/index.py @@ -0,0 +1,126 @@ +"""Entry-point module""" +import os +import mimetypes +from typing import Tuple +from zipfile import ZipFile, is_zipfile + +from werkzeug.utils import secure_filename +from flask import ( + flash, + request, + url_for, + redirect, + Blueprint, + render_template, + current_app as app, + send_from_directory) + +from uploader.db import species +from uploader.authorisation import require_login +from uploader.db_utils import with_db_connection + +indexbp = Blueprint("index", __name__) + + +def errors(rqst) -> Tuple[str, ...]: + """Return a tuple of the errors found in the request `rqst`. If no error is + found, then an empty tuple is returned.""" + def __filetype_error__(): + return ( + ("Invalid file type provided.",) + if rqst.form.get("filetype") not in ("average", "standard-error") + else tuple()) + + def __file_missing_error__(): + return ( + ("No file was uploaded.",) + if ("qc_text_file" not in rqst.files or + rqst.files["qc_text_file"].filename == "") + else tuple()) + + def __file_mimetype_error__(): + text_file = rqst.files["qc_text_file"] + return ( + ( + ("Invalid file! Expected a tab-separated-values file, or a zip " + "file of the a tab-separated-values file."),) + if text_file.mimetype not in ( + "text/plain", "text/tab-separated-values", + "application/zip") + else tuple()) + + return ( + __filetype_error__() + + (__file_missing_error__() or __file_mimetype_error__())) + +def zip_file_errors(filepath, upload_dir) -> Tuple[str, ...]: + """Check the uploaded zip file for errors.""" + zfile_errors: Tuple[str, ...] = tuple() + if is_zipfile(filepath): + with ZipFile(filepath, "r") as zfile: + infolist = zfile.infolist() + if len(infolist) != 1: + zfile_errors = zfile_errors + ( + ("Expected exactly one (1) member file within the uploaded zip " + f"file. Got {len(infolist)} member files."),) + if len(infolist) == 1 and infolist[0].is_dir(): + zfile_errors = zfile_errors + ( + ("Expected a member text file in the uploaded zip file. Got a " + "directory/folder."),) + + if len(infolist) == 1 and not infolist[0].is_dir(): + zfile.extract(infolist[0], path=upload_dir) + mime = mimetypes.guess_type(f"{upload_dir}/{infolist[0].filename}") + if mime[0] != "text/tab-separated-values": + zfile_errors = zfile_errors + ( + ("Expected the member text file in the uploaded zip file to" + " be a tab-separated file."),) + + return zfile_errors + + +@indexbp.route("/", methods=["GET"]) +@require_login +def index(): + """Display the expression data index page.""" + return render_template("expression-data/index.html") + + +@indexbp.route("/upload", methods=["GET", "POST"]) +@require_login +def upload_file(): + """Enables uploading the files""" + if request.method == "GET": + return render_template( + "select_species.html", species=with_db_connection(species)) + + upload_dir = app.config["UPLOAD_FOLDER"] + request_errors = errors(request) + if request_errors: + for error in request_errors: + flash(error, "alert-danger error-expr-data") + return redirect(url_for("expression-data.index.upload_file")) + + filename = secure_filename(request.files["qc_text_file"].filename) + if not os.path.exists(upload_dir): + os.mkdir(upload_dir) + + filepath = os.path.join(upload_dir, filename) + request.files["qc_text_file"].save(os.path.join(upload_dir, filename)) + + zip_errors = zip_file_errors(filepath, upload_dir) + if zip_errors: + for error in zip_errors: + flash(error, "alert-danger error-expr-data") + return redirect(url_for("expression-data.index.upload_file")) + + return redirect(url_for("expression-data.parse.parse", + speciesid=request.form["speciesid"], + filename=filename, + filetype=request.form["filetype"])) + +@indexbp.route("/data-review", methods=["GET"]) +@require_login +def data_review(): + """Provide some help on data expectations to the user.""" + return render_template("data_review.html") |