From 754e8f214b940e05298cb360ed829f5c685d55a5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 25 Jul 2024 11:07:33 -0500 Subject: Rename module: qc_app --> uploader --- uploader/dbinsert.py | 397 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 397 insertions(+) create mode 100644 uploader/dbinsert.py (limited to 'uploader/dbinsert.py') diff --git a/uploader/dbinsert.py b/uploader/dbinsert.py new file mode 100644 index 0000000..88d16ef --- /dev/null +++ b/uploader/dbinsert.py @@ -0,0 +1,397 @@ +"Handle inserting data into the database" +import os +import json +from typing import Union +from functools import reduce +from datetime import datetime + +from redis import Redis +from MySQLdb.cursors import DictCursor +from flask import ( + flash, request, url_for, Blueprint, redirect, render_template, + current_app as app) + +from uploader.db_utils import with_db_connection, database_connection +from uploader.db import species, species_by_id, populations_by_species + +from . import jobs + +dbinsertbp = Blueprint("dbinsert", __name__) + +def render_error(error_msg): + "Render the generic error page" + return render_template("dbupdate_error.html", error_message=error_msg), 400 + +def make_menu_items_grouper(grouping_fn=lambda item: item): + "Build function to be used to group menu items." + def __grouper__(acc, row): + grouping = grouping_fn(row[2]) + row_values = (row[0].strip(), row[1].strip()) + if acc.get(grouping) is None: + return {**acc, grouping: (row_values,)} + return {**acc, grouping: (acc[grouping] + (row_values,))} + return __grouper__ + +def genechips(): + "Retrieve the genechip information from the database" + def __organise_by_species__(acc, chip): + speciesid = chip["SpeciesId"] + if acc.get(speciesid) is None: + return {**acc, speciesid: (chip,)} + return {**acc, speciesid: acc[speciesid] + (chip,)} + + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM GeneChip ORDER BY GeneChipName ASC") + return reduce(__organise_by_species__, cursor.fetchall(), {}) + + return {} + +def platform_by_id(genechipid:int) -> Union[dict, None]: + "Retrieve the gene platform by id" + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT * FROM GeneChip WHERE GeneChipId=%s", + (genechipid,)) + return cursor.fetchone() + +def studies_by_species_and_platform(speciesid:int, genechipid:int) -> tuple: + "Retrieve the studies by the related species and gene platform" + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + query = ( + "SELECT Species.SpeciesId, ProbeFreeze.* " + "FROM Species INNER JOIN InbredSet " + "ON Species.SpeciesId=InbredSet.SpeciesId " + "INNER JOIN ProbeFreeze " + "ON InbredSet.InbredSetId=ProbeFreeze.InbredSetId " + "WHERE Species.SpeciesId = %s " + "AND ProbeFreeze.ChipId = %s") + cursor.execute(query, (speciesid, genechipid)) + return tuple(cursor.fetchall()) + + return tuple() + +def organise_groups_by_family(acc:dict, group:dict) -> dict: + "Organise the group (InbredSet) information by the group field" + family = group["Family"] + if acc.get(family): + return {**acc, family: acc[family] + (group,)} + return {**acc, family: (group,)} + +def tissues() -> tuple: + "Retrieve type (Tissue) information from the database." + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM Tissue ORDER BY Name") + return tuple(cursor.fetchall()) + + return tuple() + +@dbinsertbp.route("/platform", methods=["POST"]) +def select_platform(): + "Select the platform (GeneChipId) used for the data." + job_id = request.form["job_id"] + with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn, + database_connection(app.config["SQL_URI"]) as conn): + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) + if job: + filename = job["filename"] + filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}" + if os.path.exists(filepath): + default_species = 1 + gchips = genechips() + return render_template( + "select_platform.html", filename=filename, + filetype=job["filetype"], totallines=int(job["currentline"]), + default_species=default_species, species=species(conn), + genechips=gchips[default_species], + genechips_data=json.dumps(gchips)) + return render_error(f"File '{filename}' no longer exists.") + return render_error(f"Job '{job_id}' no longer exists.") + return render_error("Unknown error") + +@dbinsertbp.route("/study", methods=["POST"]) +def select_study(): + "View to select/create the study (ProbeFreeze) associated with the data." + form = request.form + try: + assert form.get("filename"), "filename" + assert form.get("filetype"), "filetype" + assert form.get("species"), "species" + assert form.get("genechipid"), "platform" + + speciesid = form["species"] + genechipid = form["genechipid"] + + the_studies = studies_by_species_and_platform(speciesid, genechipid) + the_groups = reduce( + organise_groups_by_family, + with_db_connection( + lambda conn: populations_by_species(conn, speciesid)), + {}) + return render_template( + "select_study.html", filename=form["filename"], + filetype=form["filetype"], totallines=form["totallines"], + species=speciesid, genechipid=genechipid, studies=the_studies, + groups=the_groups, tissues = tissues(), + selected_group=int(form.get("inbredsetid", -13)), + selected_tissue=int(form.get("tissueid", -13))) + except AssertionError as aserr: + return render_error(f"Missing data: {aserr.args[0]}") + +@dbinsertbp.route("/create-study", methods=["POST"]) +def create_study(): + "Create a new study (ProbeFreeze)." + form = request.form + try: + assert form.get("filename"), "filename" + assert form.get("filetype"), "filetype" + assert form.get("species"), "species" + assert form.get("genechipid"), "platform" + assert form.get("studyname"), "study name" + assert form.get("inbredsetid"), "group" + assert form.get("tissueid"), "type/tissue" + + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + values = ( + form["genechipid"], + form["tissueid"], + form["studyname"], + form.get("studyfullname", ""), + form.get("studyshortname", ""), + datetime.now().date().strftime("%Y-%m-%d"), + form["inbredsetid"]) + query = ( + "INSERT INTO ProbeFreeze(" + "ChipId, TissueId, Name, FullName, ShortName, CreateTime, " + "InbredSetId" + ") VALUES (%s, %s, %s, %s, %s, %s, %s)") + cursor.execute(query, values) + new_studyid = cursor.lastrowid + cursor.execute( + "UPDATE ProbeFreeze SET ProbeFreezeId=%s WHERE Id=%s", + (new_studyid, new_studyid)) + flash("Study created successfully", "alert-success") + return render_template( + "continue_from_create_study.html", + filename=form["filename"], filetype=form["filetype"], + totallines=form["totallines"], species=form["species"], + genechipid=form["genechipid"], studyid=new_studyid) + except AssertionError as aserr: + flash(f"Missing data: {aserr.args[0]}", "alert-error") + return redirect(url_for("dbinsert.select_study"), code=307) + +def datasets_by_study(studyid:int) -> tuple: + "Retrieve datasets associated with a study with the ID `studyid`." + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + query = "SELECT * FROM ProbeSetFreeze WHERE ProbeFreezeId=%s" + cursor.execute(query, (studyid,)) + return tuple(cursor.fetchall()) + + return tuple() + +def averaging_methods() -> tuple: + "Retrieve averaging methods from database" + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM AvgMethod") + return tuple(cursor.fetchall()) + + return tuple() + +def dataset_datascales() -> tuple: + "Retrieve datascales from database" + with database_connection() as conn: + with conn.cursor() as cursor: + cursor.execute( + 'SELECT DISTINCT DataScale FROM ProbeSetFreeze ' + 'WHERE DataScale IS NOT NULL AND DataScale != ""') + return tuple( + item for item in + (res[0].strip() for res in cursor.fetchall()) + if (item is not None and item != "")) + + return tuple() + +@dbinsertbp.route("/dataset", methods=["POST"]) +def select_dataset(): + "Select the dataset to add the file contents against" + form = request.form + try: + assert form.get("filename"), "filename" + assert form.get("filetype"), "filetype" + assert form.get("species"), "species" + assert form.get("genechipid"), "platform" + assert form.get("studyid"), "study" + + studyid = form["studyid"] + datasets = datasets_by_study(studyid) + return render_template( + "select_dataset.html", **{**form, "studyid": studyid}, + datasets=datasets, avgmethods=averaging_methods(), + datascales=dataset_datascales()) + except AssertionError as aserr: + return render_error(f"Missing data: {aserr.args[0]}") + +@dbinsertbp.route("/create-dataset", methods=["POST"]) +def create_dataset(): + "Select the dataset to add the file contents against" + form = request.form + try: + assert form.get("filename"), "filename" + assert form.get("filetype"), "filetype" + assert form.get("species"), "species" + assert form.get("genechipid"), "platform" + assert form.get("studyid"), "study" + assert form.get("avgid"), "averaging method" + assert form.get("datasetname2"), "Dataset Name 2" + assert form.get("datasetfullname"), "Dataset Full Name" + assert form.get("datasetshortname"), "Dataset Short Name" + assert form.get("datasetpublic"), "Dataset public specification" + assert form.get("datasetconfidentiality"), "Dataset confidentiality" + assert form.get("datasetdatascale"), "Dataset Datascale" + + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + datasetname = form["datasetname"] + cursor.execute("SELECT * FROM ProbeSetFreeze WHERE Name=%s", + (datasetname,)) + results = cursor.fetchall() + if bool(results): + flash("A dataset with that name already exists.", + "alert-error") + return redirect(url_for("dbinsert.select_dataset"), code=307) + values = ( + form["studyid"], form["avgid"], + datasetname, form["datasetname2"], + form["datasetfullname"], form["datasetshortname"], + datetime.now().date().strftime("%Y-%m-%d"), + form["datasetpublic"], form["datasetconfidentiality"], + "williamslab", form["datasetdatascale"]) + query = ( + "INSERT INTO ProbeSetFreeze(" + "ProbeFreezeId, AvgID, Name, Name2, FullName, " + "ShortName, CreateTime, OrderList, public, " + "confidentiality, AuthorisedUsers, DataScale) " + "VALUES" + "(%s, %s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s)") + cursor.execute(query, values) + new_datasetid = cursor.lastrowid + return render_template( + "continue_from_create_dataset.html", + filename=form["filename"], filetype=form["filetype"], + species=form["species"], genechipid=form["genechipid"], + studyid=form["studyid"], datasetid=new_datasetid, + totallines=form["totallines"]) + except AssertionError as aserr: + flash(f"Missing data {aserr.args[0]}", "alert-error") + return redirect(url_for("dbinsert.select_dataset"), code=307) + +def study_by_id(studyid:int) -> Union[dict, None]: + "Get a study by its Id" + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT * FROM ProbeFreeze WHERE Id=%s", + (studyid,)) + return cursor.fetchone() + +def dataset_by_id(datasetid:int) -> Union[dict, None]: + "Retrieve a dataset by its id" + with database_connection() as conn: + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + ("SELECT AvgMethod.Name AS AvgMethodName, ProbeSetFreeze.* " + "FROM ProbeSetFreeze INNER JOIN AvgMethod " + "ON ProbeSetFreeze.AvgId=AvgMethod.AvgMethodId " + "WHERE ProbeSetFreeze.Id=%s"), + (datasetid,)) + return cursor.fetchone() + +def selected_keys(original: dict, keys: tuple) -> dict: + "Return a new dict from the `original` dict with only `keys` present." + return {key: value for key,value in original.items() if key in keys} + +@dbinsertbp.route("/final-confirmation", methods=["POST"]) +def final_confirmation(): + "Preview the data before triggering entry into the database" + form = request.form + try: + assert form.get("filename"), "filename" + assert form.get("filetype"), "filetype" + assert form.get("species"), "species" + assert form.get("genechipid"), "platform" + assert form.get("studyid"), "study" + assert form.get("datasetid"), "dataset" + + speciesid = form["species"] + genechipid = form["genechipid"] + studyid = form["studyid"] + datasetid=form["datasetid"] + return render_template( + "final_confirmation.html", filename=form["filename"], + filetype=form["filetype"], totallines=form["totallines"], + species=speciesid, genechipid=genechipid, studyid=studyid, + datasetid=datasetid, the_species=selected_keys( + with_db_connection(lambda conn: species_by_id(conn, speciesid)), + ("SpeciesName", "Name", "MenuName")), + platform=selected_keys( + platform_by_id(genechipid), + ("GeneChipName", "Name", "GeoPlatform", "Title", "GO_tree_value")), + study=selected_keys( + study_by_id(studyid), ("Name", "FullName", "ShortName")), + dataset=selected_keys( + dataset_by_id(datasetid), + ("AvgMethodName", "Name", "Name2", "FullName", "ShortName", + "DataScale"))) + except AssertionError as aserr: + return render_error(f"Missing data: {aserr.args[0]}") + +@dbinsertbp.route("/insert-data", methods=["POST"]) +def insert_data(): + "Trigger data insertion" + form = request.form + try: + assert form.get("filename"), "filename" + assert form.get("filetype"), "filetype" + assert form.get("species"), "species" + assert form.get("genechipid"), "platform" + assert form.get("studyid"), "study" + assert form.get("datasetid"), "dataset" + + filename = form["filename"] + filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}" + redisurl = app.config["REDIS_URL"] + if os.path.exists(filepath): + with Redis.from_url(redisurl, decode_responses=True) as rconn: + job = jobs.launch_job( + jobs.data_insertion_job( + rconn, filepath, form["filetype"], form["totallines"], + form["species"], form["genechipid"], form["datasetid"], + app.config["SQL_URI"], redisurl, + app.config["JOBS_TTL_SECONDS"]), + redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors") + + return redirect(url_for("dbinsert.insert_status", job_id=job["jobid"])) + return render_error(f"File '{filename}' no longer exists.") + except AssertionError as aserr: + return render_error(f"Missing data: {aserr.args[0]}") + +@dbinsertbp.route("/status/", methods=["GET"]) +def insert_status(job_id: str): + "Retrieve status of data insertion." + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + job = jobs.job(rconn, jobs.jobsnamespace(), job_id) + + if job: + job_status = job["status"] + if job_status == "success": + return render_template("insert_success.html", job=job) + if job["status"] == "error": + return render_template("insert_error.html", job=job) + return render_template("insert_progress.html", job=job) + return render_template("no_such_job.html", job_id=job_id), 400 -- cgit v1.2.3