"Handle inserting data into the database" import os import json from functools import reduce import requests from redis import Redis from MySQLdb.cursors import DictCursor from flask import ( flash, request, url_for, Blueprint, redirect, render_template, current_app as app) from . import jobs from .db_utils import database_connection dbinsertbp = Blueprint("dbinsert", __name__) def render_error(error_msg): "Render the generic error page" return render_template("dbupdate_error.html", error_message=error_msg), 400 def make_menu_items_grouper(grouping_fn=lambda item: item): "Build function to be used to group menu items." def __grouper__(acc, row): grouping = grouping_fn(row[2]) row_values = (row[0].strip(), row[1].strip()) if acc.get(grouping) is None: return {**acc, grouping: (row_values,)} return {**acc, grouping: (acc[grouping] + (row_values,))} return __grouper__ def species() -> tuple: "Retrieve the species from the database." with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT SpeciesId, SpeciesName, LOWER(Name) AS Name, MenuName " "FROM Species") return tuple(cursor.fetchall()) return tuple() def genechips(): "Retrieve the genechip information from the database" def __organise_by_species__(acc, chip): speciesid = chip["SpeciesId"] if acc.get(speciesid) is None: return {**acc, speciesid: (chip,)} return {**acc, species: acc[speciesid] + (chip,)} with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM GeneChip ORDER BY GeneChipName ASC") return reduce(__organise_by_species__, cursor.fetchall(), {}) return {} def studies_by_species_and_platform(speciesid:int, genechipid:int) -> tuple: "Retrieve the studies by the related species and gene platform" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: query = ( "SELECT Species.SpeciesId, ProbeFreeze.* " "FROM Species INNER JOIN InbredSet " "ON Species.SpeciesId=InbredSet.SpeciesId " "INNER JOIN ProbeFreeze " "ON InbredSet.InbredSetId=ProbeFreeze.InbredSetId " "WHERE Species.SpeciesId = %s " "AND ProbeFreeze.ChipId = %s") cursor.execute(query, (speciesid, genechipid)) return tuple(cursor.fetchall()) return tuple() def groups_by_species(speciesid:int) -> tuple: "Retrieve group (InbredSet) information from the database." with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: query = "SELECT * FROM InbredSet WHERE SpeciesId=%s" cursor.execute(query, (speciesid,)) return tuple(cursor.fetchall()) return tuple() def organise_groups_by_family(acc:dict, group:dict) -> dict: "Organise the group (InbredSet) information by the group field" family = group["Family"] if acc.get(family): return {**acc, family: acc[family] + (group,)} return {**acc, family: (group,)} def tissues() -> tuple: "Retrieve type (Tissue) information from the database." with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM Tissue ORDER BY Name") return tuple(cursor.fetchall()) return tuple() @dbinsertbp.route("/platform", methods=["POST"]) def select_platform(): "Select the platform (GeneChipId) used for the data." job_id = request.form["job_id"] with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: job = jobs.job(rconn, job_id) if job: filename = job["filename"] filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}" if os.path.exists(filepath): default_species = 1 gchips = genechips() return render_template( "select_platform.html", filename=filename, filetype=job["filetype"], default_species=default_species, species=species(), genechips=gchips[default_species], genechips_data=json.dumps(gchips)) return render_error(f"File '{filename}' no longer exists.") return render_error(f"Job '{job_id}' no longer exists.") return render_error("Unknown error") @dbinsertbp.route("/study", methods=["POST"]) def select_study(): "View to select/create the study (ProbeFreeze) associated with the data." form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" speciesid = form["species"] genechipid = form["genechipid"] the_studies = studies_by_species_and_platform(speciesid, genechipid) the_groups = reduce( organise_groups_by_family, groups_by_species(speciesid), {}) return render_template( "select_study.html", filename=form["filename"], filetype=form["filetype"], species=speciesid, genechipid=genechipid, studies=the_studies, groups=the_groups, tissues = tissues(), selected_group=int(form.get("inbredsetid", -13)), selected_tissue=int(form.get("tissueid", -13))) except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") @dbinsertbp.route("/create-study", methods=["POST"]) def create_study(): "Create a new study (ProbeFreeze)." form = request.form try: assert form.get("studyname"), "study name" assert form.get("inbredsetid"), "group" assert form.get("tissueid"), "type/tissue" except AssertionError as aserr: flash(f"Missing data: {aserr.args[0]}", "alert-error") return redirect(url_for("dbinsert.select_study"), code=307) @dbinsertbp.route("/dataset", methods=["POST"]) def select_dataset(): "Select the dataset to add the file contents against" form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" assert (form.get("studyid") or form.get(studyname)), "" assert form.get(""), "" assert form.get(""), "" assert form.get(""), "" assert form.get(""), "" return render_template( "select_dataset.html", filename=form["filename"], filetype=form["filetype"], species=speciesid, genechipid=genechipid) except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") @dbinsertbp.route("/insert_data", methods=["POST"]) def insert_data(): "Preview the data before triggering entry into the database" form = request.form filename = form["filename"] filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}" if os.path.exists(filepath): try: species = form["species"] filetype = form["filetype"] datasetid = int(form["dataset"]) genechipid = int(form["genechipid"]) return (f"Would insert '{species}' data in '{filetype}' file " f"'{filepath}' into the database with the dataset " f"'{datasetid}' and genechip '{genechipid}'.") except ValueError as verr: msg = "::".join(verr.args) return render_error(f"Invalid value: {msg}") return render_error(f"File '{filename}' no longer exists.")