"""Module to handle uploading of R/qtl2 bundles.""" from pathlib import Path from datetime import date from zipfile import ZipFile, is_zipfile from MySQLdb.cursors import DictCursor from flask import ( flash, request, url_for, redirect, Blueprint, render_template, current_app as app) from r_qtl import r_qtl2 from r_qtl.errors import InvalidFormat from qc_app.files import save_file, fullpath from qc_app.dbinsert import species as all_species from qc_app.db_utils import with_db_connection, database_connection from qc_app.db import ( species_by_id, save_population, populations_by_species, population_by_species_and_id,) from qc_app.db.datasets import ( geno_dataset_by_id, geno_dataset_by_species_and_population) rqtl2 = Blueprint("rqtl2", __name__) @rqtl2.route("/", methods=["GET", "POST"]) @rqtl2.route("/select-species", methods=["POST"]) def select_species(): """Select the species.""" if request.method == "GET": return render_template("rqtl2/index.html", species=with_db_connection(all_species)) species_id = request.form.get("species_id") species = with_db_connection( lambda conn: species_by_id(conn, species_id)) if bool(species): return redirect(url_for( "upload.rqtl2.select_population", species_id=species_id)) flash("Invalid species or no species selected!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) @rqtl2.route("/upload/species//select-population", methods=["GET", "POST"]) def select_population(species_id: int): """Select/Create the population to organise data under.""" with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) if not bool(species): flash("Invalid species selected!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if request.method == "GET": return render_template( "rqtl2/select-population.html", species=species, populations=populations_by_species(conn, species_id)) population = population_by_species_and_id( conn, species["SpeciesId"], request.form.get("inbredset_id")) if not bool(population): flash("Invalid Population!", "alert-error alert-rqtl2") return redirect( url_for("upload.rqtl2.select_population", pgsrc="error"), code=307) return redirect(url_for("upload.rqtl2.upload_rqtl2_bundle", species_id=species["SpeciesId"], population_id=population["InbredSetId"])) @rqtl2.route("/upload/species//create-population", methods=["POST"]) def create_population(species_id: int): """Create a new population for the given species.""" population_page = redirect(url_for("upload.rqtl2.select_population")) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) population_name = request.form.get("inbredset_name", "").strip() population_fullname = request.form.get("inbredset_fullname", "").strip() if not bool(species): flash("Invalid species!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if not bool(population_name): flash("Invalid Population Name!", "alert-error alert-rqtl2") return population_page if not bool(population_fullname): flash("Invalid Population Full Name!", "alert-error alert-rqtl2") return population_page new_population = save_population(conn, { "SpeciesId": species["SpeciesId"], "Name": population_name, "InbredSetName": population_fullname, "FullName": population_fullname, "Family": request.form.get("inbredset_family") or None, "Description": request.form.get("description") or None }) flash("Population created successfully.", "alert-success") return redirect( url_for("upload.rqtl2.upload_rqtl2_bundle", population_id=new_population["population_id"], pgsrc="create-population"), code=307) class __RequestError__(Exception): #pylint: disable=[invalid-name] """Internal class to avoid pylint's `too-many-return-statements` error.""" @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle"), methods=["GET", "POST"]) def upload_rqtl2_bundle(species_id: int, population_id: int): """Allow upload of R/qtl2 bundle.""" this_page_with_errors = redirect(url_for("upload.rqtl2.upload_rqtl2_bundle", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) with database_connection(app.config["SQL_URI"]) as conn: species = species_by_id(conn, species_id) population = population_by_species_and_id( conn, species["SpeciesId"], population_id) if not bool(species): flash("Invalid species!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if not bool(population): flash("Invalid Population!", "alert-error alert-rqtl2") return redirect( url_for("upload.rqtl2.select_population", pgsrc="error"), code=307) if request.method == "GET" or ( request.method == "POST" and bool(request.args.get("pgsrc"))): return render_template("rqtl2/upload-rqtl2-bundle-step-01.html", species=species, population=population) if not bool(request.files.get("rqtl2_bundle")): raise __RequestError__("No R/qtl2 zip bundle provided.") the_file = save_file( request.files["rqtl2_bundle"], Path(app.config["UPLOAD_FOLDER"])) if not bool(the_file): raise __RequestError__("Please provide a valid R/qtl2 zip bundle.") if not is_zipfile(str(the_file)): raise __RequestError__("Invalid file! Expected a zip file.") try: with ZipFile(str(the_file), "r") as zfile: r_qtl2.validate_bundle(zfile) return render_template( "rqtl2/upload-rqtl2-bundle-step-02.html", species=species, population=population, rqtl2_bundle_file=the_file.name)#type: ignore[union-attr] except (InvalidFormat, __RequestError__) as exc: flash("".join(exc.args), "alert-error alert-rqtl2") return this_page_with_errors def check_errors(conn, *args, **kwargs): """Check for select errors in the forms and return a page to redirect to.""" species_id = kwargs.get("species_id") or request.form.get("species_id") population_id = (kwargs.get("population_id") or request.form.get("population_id")) species = species_by_id(conn, species_id) population = population_by_species_and_id(conn, species_id, population_id) if "species" in args and not bool(species): flash("Invalid species!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_species")) if "population" in args and not bool(population): flash("Invalid Population!", "alert-error alert-rqtl2") return redirect( url_for("upload.rqtl2.select_population", pgsrc="error"), code=307) if ("rqtl2_bundle_file" in args and not bool(request.form.get("rqtl2_bundle_file"))): flash("There is no file to process.", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.upload_rqtl2_bundle", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) if ("geno-dataset" in args and not bool(request.form.get("geno-dataset-id"))): flash("No genotype dataset was provided!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_geno_dataset", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) return None @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle/dataset-info"), methods=["POST"]) def select_dataset_info(species_id: int, population_id: int): """ If `geno` files exist in the R/qtl2 bundle, prompt user to provide the dataset the genotypes belong to. """ form = request.form with database_connection(app.config["SQL_URI"]) as conn: error_page = check_errors(conn, "species", "population", "rqtl2_bundle_file") if bool(error_page): return error_page species = species_by_id(conn, species_id) population = population_by_species_and_id( conn, species_id, population_id) thefile = fullpath(form["rqtl2_bundle_file"]) with ZipFile(str(thefile), "r") as zfile: cdata = r_qtl2.control_data(zfile) if "geno" in cdata and not bool(form.get("geno-dataset-id")): return render_template( "rqtl2/select-geno-dataset.html", species=species, population=population, rqtl2_bundle_file=thefile.name, datasets=geno_dataset_by_species_and_population( conn, species_id, population_id)) geno_dataset = geno_dataset_by_id( conn, int(form["geno-dataset-id"])) return render_template("rqtl2/summary-info.html", species=species, population=population, geno_dataset=geno_dataset) @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle/select-geno-dataset"), methods=["POST"]) def select_geno_dataset(species_id: int, population_id: int): """Select from existing geno datasets.""" with database_connection(app.config["SQL_URI"]) as conn: error = check_errors( conn, "species", "population", "rqtl2_bundle_file", "geno-dataset") if bool(error): return error geno_dset = geno_dataset_by_species_and_population( conn, species_id, population_id) if not bool(geno_dset): flash("No genotype dataset was provided!", "alert-error alert-rqtl2") return redirect(url_for("upload.rqtl2.select_geno_dataset", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) flash("Genotype accepted", "alert-success alert-rqtl2") return redirect(url_for("upload.rqtl2.select_dataset_info", species_id=species_id, population_id=population_id, pgsrc="upload.rqtl2.select_geno_dataset"), code=307) @rqtl2.route(("/upload/species//population/" "/rqtl2-bundle/create-geno-dataset"), methods=["POST"]) def create_geno_dataset(species_id: int, population_id: int): """Create a new geno dataset.""" with database_connection(app.config["SQL_URI"]) as conn: error = check_errors(conn, "species", "population", "rqtl2_bundle_file") if bool(error): return error sgeno_page = redirect(url_for("upload.rqtl2.select_geno_dataset", species_id=species_id, population_id=population_id, pgsrc="error"), code=307) if not bool(request.form.get("dataset-name")): flash("You must provide the dataset name", "alert-error alert-rqtl2") return sgeno_page if not bool(request.form.get("dataset-fullname")): flash("You must provide the dataset full name", "alert-error alert-rqtl2") return sgeno_page if not bool(request.form.get("dataset-shortname")): flash("You must provide the dataset short name", "alert-error alert-rqtl2") return sgeno_page public = 2 if request.form.get("dataset-public") == "on" else 0 with conn.cursor(cursorclass=DictCursor) as cursor: new_dataset = { "name": request.form.get("dataset-name"), "fname": request.form.get("dataset-fullname"), "sname": request.form.get("dataset-shortname"), "today": date.today().isoformat(), "pub": public, "isetid": population_id } cursor.execute( "INSERT INTO GenoFreeze(" "Name, FullName, ShortName, CreateTime, public, InbredSetId" ") " "VALUES(" "%(name)s, %(fname)s, %(sname)s, %(today)s, %(pub)s, %(isetid)s" ")", new_dataset) flash("Created dataset successfully.", "alert-success") return render_template( "rqtl2/create-geno-dataset-success.html", species=species_by_id(conn, species_id), population=population_by_species_and_id( conn, species_id, population_id), rqtl2_bundle_file=request.form["rqtl2_bundle_file"], geno_dataset={**new_dataset, "id": cursor.lastrowid})