"""Module to handle uploading of R/qtl2 bundles.""" from pathlib import Path from typing import Optional from zipfile import ZipFile, is_zipfile from flask import ( flash, request, url_for, redirect, Response, Blueprint, render_template, current_app as app) from r_qtl import r_qtl2 from r_qtl.errors import InvalidFormat from qc_app.files import save_file, fullpath from qc_app.dbinsert import species as all_species from qc_app.db_utils import with_db_connection, database_connection from qc_app.db import ( species_by_id, save_population, populations_by_species, population_by_species_and_id, geno_dataset_by_species_and_population) rqtl2 = Blueprint("rqtl2", __name__) @rqtl2.route("/", methods=["GET", "POST"]) @rqtl2.route("/select-species", methods=["POST"]) def select_species(): """Select the species.""" if request.method == "GET": return render_template("rqtl2/index.html", species=with_db_connection(all_species)) species_id = request.form.get("species_id") species = with_db_connection( lambda conn: species_by_id(conn, species_id)) if bool(species): return redirect(url_for( "upload.rqtl2.select_population", species_id=species_id)) flash("Invalid species or no species selected!", "alert-error error-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) @rqtl2.route("/upload/species//select-population", methods=["GET", "POST"]) def select_population(species_id: int): """Select/Create the population to organise data under.""" with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) if not bool(species): flash("Invalid species selected!", "alert-error error-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if request.method == "GET": return render_template( "rqtl2/select-population.html", species=species, populations=populations_by_species(conn, species_id)) population = population_by_species_and_id( conn, species["SpeciesId"], request.form.get("inbredset_id")) if not bool(population): flash("Invalid Population!", "alert-error error-rqtl2") return redirect( url_for("upload.rqtl2.select_population", pgsrc="error"), code=307) return redirect(url_for("upload.rqtl2.upload_rqtl2_bundle", species_id=species["SpeciesId"], population_id=population["InbredSetId"])) @rqtl2.route("/upload/species//create-population", methods=["POST"]) def create_population(species_id: int): """Create a new population for the given species.""" population_page = redirect(url_for("upload.rqtl2.select_population")) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) population_name = request.form.get("inbredset_name", "").strip() population_fullname = request.form.get("inbredset_fullname", "").strip() if not bool(species): flash("Invalid species!", "alert-error error-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if not bool(population_name): flash("Invalid Population Name!", "alert-error error-rqtl2") return population_page if not bool(population_fullname): flash("Invalid Population Full Name!", "alert-error error-rqtl2") return population_page new_population = save_population(conn, { "SpeciesId": species["SpeciesId"], "Name": population_name, "InbredSetName": population_fullname, "FullName": population_fullname, "Family": request.form.get("inbredset_family") or None, "Description": request.form.get("description") or None }) flash("Population created successfully.", "alert-success") return redirect( url_for("upload.rqtl2.upload_rqtl2_bundle", population_id=new_population["population_id"], pgsrc="create-population"), code=307) class __RequestError__(Exception): #pylint: disable=[invalid-name] """Internal class to avoid pylint's `too-many-return-statements` error.""" @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle"), methods=["GET", "POST"]) def upload_rqtl2_bundle(species_id: int, population_id: int): """Allow upload of R/qtl2 bundle.""" this_page_with_errors = redirect(url_for("upload.rqtl2.upload_rqtl2_bundle", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) population = population_by_species_and_id( conn, species["SpeciesId"], population_id) if not bool(species): flash("Invalid species!", "alert-error error-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if not bool(population): flash("Invalid Population!", "alert-error error-rqtl2") return redirect( url_for("upload.rqtl2.select_population", pgsrc="error"), code=307) if request.method == "GET" or ( request.method == "POST" and bool(request.args.get("pgsrc"))): return render_template("rqtl2/upload-rqtl2-bundle-step-01.html", species=species, population=population) if not bool(request.files.get("rqtl2_bundle")): raise __RequestError__("No R/qtl2 zip bundle provided.") the_file = save_file( request.files["rqtl2_bundle"], Path(app.config["UPLOAD_FOLDER"])) if not bool(the_file): raise __RequestError__("Please provide a valid R/qtl2 zip bundle.") if not is_zipfile(str(the_file)): raise __RequestError__("Invalid file! Expected a zip file.") try: with ZipFile(str(the_file), "r") as zfile: r_qtl2.validate_bundle(zfile) return render_template( "rqtl2/upload-rqtl2-bundle-step-02.html", species=species, population=population, rqtl2_bundle_file=the_file.name) except (InvalidFormat, __RequestError__) as exc: flash("".join(exc.args), "alert-error alert-danger error-rqtl2") return this_page_with_errors def check_errors(conn, *args, **kwargs) -> Optional[Response]: """Check for select errors in the forms and return a page to redirect to.""" species_id = kwargs.get("species_id") or request.form.get("species_id") population_id = (kwargs.get("population_id") or request.form.get("population_id")) species = species_by_id(conn, species_id) population = population_by_species_and_id(conn, species_id, population_id) if "species" in args and not bool(species): flash("Invalid species!", "alert-error error-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if "population" in args and not bool(population): flash("Invalid Population!", "alert-error error-rqtl2") return redirect( url_for("upload.rqtl2.select_population", pgsrc="error"), code=307) if ("rqtl2_bundle_file" in args and not bool(request.form.get("rqtl2_bundle_file"))): flash("There is no file to process.", "alert-error alert-danger error-rqtl2") return redirect(url_for("upload.rqtl2.upload_rqtl2_bundle", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) return False @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle/dataset-info"), methods=["POST"]) def select_dataset_info(species_id: int, population_id: int): """ If `geno` files exist in the R/qtl2 bundle, prompt user to provide the dataset the genotypes belong to. """ form = request.form with database_connection(app.config["SQL_URI"]) as conn: error_page = check_errors(conn, "species", "population", "rqtl2_bundle_file") if bool(error_page): return error_page thefile = fullpath(form["rqtl2_bundle_file"]) with ZipFile(str(thefile), "r") as zfile: cdata = r_qtl2.control_data(zfile) if "geno" in cdata and not bool(form.get("geno_datasetid")): return render_template( "rqtl2/select-geno-dataset.html", species=species_by_id(conn, species_id), population=population_by_species_and_id( conn, species_id, population_id), rqtl2_bundle_file=thefile.name, datasets=geno_dataset_by_species_and_population( conn, species_id, population_id)) return "All data points collected. Should proceed to launching the job." @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle/select-geno-dataset"), methods=["POST"]) def select_geno_dataset(species_id: int, population_id: int) -> Response: """Select from existing geno datasets.""" return "IMPLEMENT THIS!!!" @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle/create-geno-dataset"), methods=["POST"]) def create_geno_dataset(species_id: int, population_id: int) -> Response: """Create a new geno dataset.""" return "IMPLEMENT THIS!!!"