"Handle inserting data into the database" import os import json from typing import Union from functools import reduce from datetime import datetime from redis import Redis from MySQLdb.cursors import DictCursor from flask import ( flash, request, url_for, Blueprint, redirect, render_template, current_app as app) from qc_app.db_utils import with_db_connection, database_connection from qc_app.db import species, species_by_id, populations_by_species from . import jobs dbinsertbp = Blueprint("dbinsert", __name__) def render_error(error_msg): "Render the generic error page" return render_template("dbupdate_error.html", error_message=error_msg), 400 def make_menu_items_grouper(grouping_fn=lambda item: item): "Build function to be used to group menu items." def __grouper__(acc, row): grouping = grouping_fn(row[2]) row_values = (row[0].strip(), row[1].strip()) if acc.get(grouping) is None: return {**acc, grouping: (row_values,)} return {**acc, grouping: (acc[grouping] + (row_values,))} return __grouper__ def genechips(): "Retrieve the genechip information from the database" def __organise_by_species__(acc, chip): speciesid = chip["SpeciesId"] if acc.get(speciesid) is None: return {**acc, speciesid: (chip,)} return {**acc, speciesid: acc[speciesid] + (chip,)} with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM GeneChip ORDER BY GeneChipName ASC") return reduce(__organise_by_species__, cursor.fetchall(), {}) return {} def platform_by_id(genechipid:int) -> Union[dict, None]: "Retrieve the gene platform by id" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT * FROM GeneChip WHERE GeneChipId=%s", (genechipid,)) return cursor.fetchone() def studies_by_species_and_platform(speciesid:int, genechipid:int) -> tuple: "Retrieve the studies by the related species and gene platform" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: query = ( "SELECT Species.SpeciesId, ProbeFreeze.* " "FROM Species INNER JOIN InbredSet " "ON Species.SpeciesId=InbredSet.SpeciesId " "INNER JOIN ProbeFreeze " "ON InbredSet.InbredSetId=ProbeFreeze.InbredSetId " "WHERE Species.SpeciesId = %s " "AND ProbeFreeze.ChipId = %s") cursor.execute(query, (speciesid, genechipid)) return tuple(cursor.fetchall()) return tuple() def organise_groups_by_family(acc:dict, group:dict) -> dict: "Organise the group (InbredSet) information by the group field" family = group["Family"] if acc.get(family): return {**acc, family: acc[family] + (group,)} return {**acc, family: (group,)} def tissues() -> tuple: "Retrieve type (Tissue) information from the database." with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM Tissue ORDER BY Name") return tuple(cursor.fetchall()) return tuple() @dbinsertbp.route("/platform", methods=["POST"]) def select_platform(): "Select the platform (GeneChipId) used for the data." job_id = request.form["job_id"] with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn, database_connection(app.config["SQL_URI"]) as conn): job = jobs.job(rconn, jobs.jobsnamespace(), job_id) if job: filename = job["filename"] filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}" if os.path.exists(filepath): default_species = 1 gchips = genechips() return render_template( "select_platform.html", filename=filename, filetype=job["filetype"], totallines=int(job["currentline"]), default_species=default_species, species=species(conn), genechips=gchips[default_species], genechips_data=json.dumps(gchips)) return render_error(f"File '{filename}' no longer exists.") return render_error(f"Job '{job_id}' no longer exists.") return render_error("Unknown error") @dbinsertbp.route("/study", methods=["POST"]) def select_study(): "View to select/create the study (ProbeFreeze) associated with the data." form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" speciesid = form["species"] genechipid = form["genechipid"] the_studies = studies_by_species_and_platform(speciesid, genechipid) the_groups = reduce( organise_groups_by_family, with_db_connection( lambda conn: populations_by_species(conn, speciesid)), {}) return render_template( "select_study.html", filename=form["filename"], filetype=form["filetype"], totallines=form["totallines"], species=speciesid, genechipid=genechipid, studies=the_studies, groups=the_groups, tissues = tissues(), selected_group=int(form.get("inbredsetid", -13)), selected_tissue=int(form.get("tissueid", -13))) except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") @dbinsertbp.route("/create-study", methods=["POST"]) def create_study(): "Create a new study (ProbeFreeze)." form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" assert form.get("studyname"), "study name" assert form.get("inbredsetid"), "group" assert form.get("tissueid"), "type/tissue" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT MAX(Id) AS last_id FROM ProbeFreeze") new_studyid = cursor.fetchone()["last_id"] + 1 values = ( new_studyid, new_studyid, form["genechipid"], form["tissueid"], form["studyname"], form.get("studyfullname", ""), form.get("studyshortname", ""), datetime.now().date().strftime("%Y-%m-%d"), form["inbredsetid"]) query = ( "INSERT INTO ProbeFreeze(" "Id, ProbeFreezeId, ChipId, TissueId, Name, FullName, " "ShortName, CreateTime, InbredSetId) " "VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s)") cursor.execute(query, values) lastrowid = cursor.lastrowid cursor.execute( "UPDATE ProbeFreeze SET ProbeFreezeId=%s WHERE Id=%s", (lastrowid, lastrowid)) flash("Study created successfully", "alert-success") return render_template( "continue_from_create_study.html", filename=form["filename"], filetype=form["filetype"], totallines=form["totallines"], species=form["species"], genechipid=form["genechipid"], studyid=new_studyid) except AssertionError as aserr: flash(f"Missing data: {aserr.args[0]}", "alert-error") return redirect(url_for("dbinsert.select_study"), code=307) def datasets_by_study(studyid:int) -> tuple: "Retrieve datasets associated with a study with the ID `studyid`." with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: query = "SELECT * FROM ProbeSetFreeze WHERE ProbeFreezeId=%s" cursor.execute(query, (studyid,)) return tuple(cursor.fetchall()) return tuple() def averaging_methods() -> tuple: "Retrieve averaging methods from database" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute("SELECT * FROM AvgMethod") return tuple(cursor.fetchall()) return tuple() def dataset_datascales() -> tuple: "Retrieve datascales from database" with database_connection() as conn: with conn.cursor() as cursor: cursor.execute( 'SELECT DISTINCT DataScale FROM ProbeSetFreeze ' 'WHERE DataScale IS NOT NULL AND DataScale != ""') return tuple( item for item in (res[0].strip() for res in cursor.fetchall()) if (item is not None and item != "")) return tuple() @dbinsertbp.route("/dataset", methods=["POST"]) def select_dataset(): "Select the dataset to add the file contents against" form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" assert form.get("studyid"), "study" studyid = form["studyid"] datasets = datasets_by_study(studyid) return render_template( "select_dataset.html", **{**form, "studyid": studyid}, datasets=datasets, avgmethods=averaging_methods(), datascales=dataset_datascales()) except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") @dbinsertbp.route("/create-dataset", methods=["POST"]) def create_dataset(): "Select the dataset to add the file contents against" form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" assert form.get("studyid"), "study" assert form.get("avgid"), "averaging method" assert form.get("datasetname2"), "Dataset Name 2" assert form.get("datasetfullname"), "Dataset Full Name" assert form.get("datasetshortname"), "Dataset Short Name" assert form.get("datasetpublic"), "Dataset public specification" assert form.get("datasetconfidentiality"), "Dataset confidentiality" assert form.get("datasetdatascale"), "Dataset Datascale" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: datasetname = form["datasetname"] cursor.execute("SELECT * FROM ProbeSetFreeze WHERE Name=%s", (datasetname,)) results = cursor.fetchall() if bool(results): flash("A dataset with that name already exists.", "alert-error") return redirect(url_for("dbinsert.select_dataset"), code=307) cursor.execute("SELECT MAX(Id) AS last_id FROM ProbeSetFreeze") new_datasetid = cursor.fetchone()["last_id"] + 1 values = ( new_datasetid, form["studyid"], form["avgid"], datasetname, form["datasetname2"], form["datasetfullname"], form["datasetshortname"], datetime.now().date().strftime("%Y-%m-%d"), form["datasetpublic"], form["datasetconfidentiality"], "williamslab", form["datasetdatascale"]) query = ( "INSERT INTO ProbeSetFreeze(" "Id, ProbeFreezeId, AvgID, Name, Name2, FullName, " "ShortName, CreateTime, OrderList, public, " "confidentiality, AuthorisedUsers, DataScale) " "VALUES" "(%s, %s, %s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s)") cursor.execute(query, values) return render_template( "continue_from_create_dataset.html", filename=form["filename"], filetype=form["filetype"], species=form["species"], genechipid=form["genechipid"], studyid=form["studyid"], datasetid=new_datasetid, totallines=form["totallines"]) except AssertionError as aserr: flash(f"Missing data {aserr.args[0]}", "alert-error") return redirect(url_for("dbinsert.select_dataset"), code=307) def study_by_id(studyid:int) -> Union[dict, None]: "Get a study by its Id" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( "SELECT * FROM ProbeFreeze WHERE Id=%s", (studyid,)) return cursor.fetchone() def dataset_by_id(datasetid:int) -> Union[dict, None]: "Retrieve a dataset by its id" with database_connection() as conn: with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( ("SELECT AvgMethod.Name AS AvgMethodName, ProbeSetFreeze.* " "FROM ProbeSetFreeze INNER JOIN AvgMethod " "ON ProbeSetFreeze.AvgId=AvgMethod.AvgMethodId " "WHERE ProbeSetFreeze.Id=%s"), (datasetid,)) return cursor.fetchone() def selected_keys(original: dict, keys: tuple) -> dict: "Return a new dict from the `original` dict with only `keys` present." return {key: value for key,value in original.items() if key in keys} @dbinsertbp.route("/final-confirmation", methods=["POST"]) def final_confirmation(): "Preview the data before triggering entry into the database" form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" assert form.get("studyid"), "study" assert form.get("datasetid"), "dataset" speciesid = form["species"] genechipid = form["genechipid"] studyid = form["studyid"] datasetid=form["datasetid"] return render_template( "final_confirmation.html", filename=form["filename"], filetype=form["filetype"], totallines=form["totallines"], species=speciesid, genechipid=genechipid, studyid=studyid, datasetid=datasetid, the_species=selected_keys( with_db_connection(lambda conn: species_by_id(conn, speciesid)), ("SpeciesName", "Name", "MenuName")), platform=selected_keys( platform_by_id(genechipid), ("GeneChipName", "Name", "GeoPlatform", "Title", "GO_tree_value")), study=selected_keys( study_by_id(studyid), ("Name", "FullName", "ShortName")), dataset=selected_keys( dataset_by_id(datasetid), ("AvgMethodName", "Name", "Name2", "FullName", "ShortName", "DataScale"))) except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") @dbinsertbp.route("/insert-data", methods=["POST"]) def insert_data(): "Trigger data insertion" form = request.form try: assert form.get("filename"), "filename" assert form.get("filetype"), "filetype" assert form.get("species"), "species" assert form.get("genechipid"), "platform" assert form.get("studyid"), "study" assert form.get("datasetid"), "dataset" filename = form["filename"] filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}" redisurl = app.config["REDIS_URL"] if os.path.exists(filepath): with Redis.from_url(redisurl, decode_responses=True) as rconn: job = jobs.launch_job( jobs.data_insertion_job( rconn, filepath, form["filetype"], form["totallines"], form["species"], form["genechipid"], form["datasetid"], app.config["SQL_URI"], redisurl, app.config["JOBS_TTL_SECONDS"]), redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors") return redirect(url_for("dbinsert.insert_status", job_id=job["jobid"])) return render_error(f"File '{filename}' no longer exists.") except AssertionError as aserr: return render_error(f"Missing data: {aserr.args[0]}") @dbinsertbp.route("/status/", methods=["GET"]) def insert_status(job_id: str): "Retrieve status of data insertion." with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: job = jobs.job(rconn, jobs.jobsnamespace(), job_id) if job: job_status = job["status"] if job_status == "success": return render_template("insert_success.html", job=job) if job["status"] == "error": return render_template("insert_error.html", job=job) return render_template("insert_progress.html", job=job) return render_template("no_such_job.html", job_id=job_id), 400