aboutsummaryrefslogtreecommitdiff
path: root/uploader/dbinsert.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/dbinsert.py')
-rw-r--r--uploader/dbinsert.py397
1 files changed, 397 insertions, 0 deletions
diff --git a/uploader/dbinsert.py b/uploader/dbinsert.py
new file mode 100644
index 0000000..88d16ef
--- /dev/null
+++ b/uploader/dbinsert.py
@@ -0,0 +1,397 @@
+"Handle inserting data into the database"
+import os
+import json
+from typing import Union
+from functools import reduce
+from datetime import datetime
+
+from redis import Redis
+from MySQLdb.cursors import DictCursor
+from flask import (
+ flash, request, url_for, Blueprint, redirect, render_template,
+ current_app as app)
+
+from uploader.db_utils import with_db_connection, database_connection
+from uploader.db import species, species_by_id, populations_by_species
+
+from . import jobs
+
+dbinsertbp = Blueprint("dbinsert", __name__)
+
+def render_error(error_msg):
+ "Render the generic error page"
+ return render_template("dbupdate_error.html", error_message=error_msg), 400
+
+def make_menu_items_grouper(grouping_fn=lambda item: item):
+ "Build function to be used to group menu items."
+ def __grouper__(acc, row):
+ grouping = grouping_fn(row[2])
+ row_values = (row[0].strip(), row[1].strip())
+ if acc.get(grouping) is None:
+ return {**acc, grouping: (row_values,)}
+ return {**acc, grouping: (acc[grouping] + (row_values,))}
+ return __grouper__
+
+def genechips():
+ "Retrieve the genechip information from the database"
+ def __organise_by_species__(acc, chip):
+ speciesid = chip["SpeciesId"]
+ if acc.get(speciesid) is None:
+ return {**acc, speciesid: (chip,)}
+ return {**acc, speciesid: acc[speciesid] + (chip,)}
+
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute("SELECT * FROM GeneChip ORDER BY GeneChipName ASC")
+ return reduce(__organise_by_species__, cursor.fetchall(), {})
+
+ return {}
+
+def platform_by_id(genechipid:int) -> Union[dict, None]:
+ "Retrieve the gene platform by id"
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT * FROM GeneChip WHERE GeneChipId=%s",
+ (genechipid,))
+ return cursor.fetchone()
+
+def studies_by_species_and_platform(speciesid:int, genechipid:int) -> tuple:
+ "Retrieve the studies by the related species and gene platform"
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ query = (
+ "SELECT Species.SpeciesId, ProbeFreeze.* "
+ "FROM Species INNER JOIN InbredSet "
+ "ON Species.SpeciesId=InbredSet.SpeciesId "
+ "INNER JOIN ProbeFreeze "
+ "ON InbredSet.InbredSetId=ProbeFreeze.InbredSetId "
+ "WHERE Species.SpeciesId = %s "
+ "AND ProbeFreeze.ChipId = %s")
+ cursor.execute(query, (speciesid, genechipid))
+ return tuple(cursor.fetchall())
+
+ return tuple()
+
+def organise_groups_by_family(acc:dict, group:dict) -> dict:
+ "Organise the group (InbredSet) information by the group field"
+ family = group["Family"]
+ if acc.get(family):
+ return {**acc, family: acc[family] + (group,)}
+ return {**acc, family: (group,)}
+
+def tissues() -> tuple:
+ "Retrieve type (Tissue) information from the database."
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute("SELECT * FROM Tissue ORDER BY Name")
+ return tuple(cursor.fetchall())
+
+ return tuple()
+
+@dbinsertbp.route("/platform", methods=["POST"])
+def select_platform():
+ "Select the platform (GeneChipId) used for the data."
+ job_id = request.form["job_id"]
+ with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn,
+ database_connection(app.config["SQL_URI"]) as conn):
+ job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
+ if job:
+ filename = job["filename"]
+ filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}"
+ if os.path.exists(filepath):
+ default_species = 1
+ gchips = genechips()
+ return render_template(
+ "select_platform.html", filename=filename,
+ filetype=job["filetype"], totallines=int(job["currentline"]),
+ default_species=default_species, species=species(conn),
+ genechips=gchips[default_species],
+ genechips_data=json.dumps(gchips))
+ return render_error(f"File '{filename}' no longer exists.")
+ return render_error(f"Job '{job_id}' no longer exists.")
+ return render_error("Unknown error")
+
+@dbinsertbp.route("/study", methods=["POST"])
+def select_study():
+ "View to select/create the study (ProbeFreeze) associated with the data."
+ form = request.form
+ try:
+ assert form.get("filename"), "filename"
+ assert form.get("filetype"), "filetype"
+ assert form.get("species"), "species"
+ assert form.get("genechipid"), "platform"
+
+ speciesid = form["species"]
+ genechipid = form["genechipid"]
+
+ the_studies = studies_by_species_and_platform(speciesid, genechipid)
+ the_groups = reduce(
+ organise_groups_by_family,
+ with_db_connection(
+ lambda conn: populations_by_species(conn, speciesid)),
+ {})
+ return render_template(
+ "select_study.html", filename=form["filename"],
+ filetype=form["filetype"], totallines=form["totallines"],
+ species=speciesid, genechipid=genechipid, studies=the_studies,
+ groups=the_groups, tissues = tissues(),
+ selected_group=int(form.get("inbredsetid", -13)),
+ selected_tissue=int(form.get("tissueid", -13)))
+ except AssertionError as aserr:
+ return render_error(f"Missing data: {aserr.args[0]}")
+
+@dbinsertbp.route("/create-study", methods=["POST"])
+def create_study():
+ "Create a new study (ProbeFreeze)."
+ form = request.form
+ try:
+ assert form.get("filename"), "filename"
+ assert form.get("filetype"), "filetype"
+ assert form.get("species"), "species"
+ assert form.get("genechipid"), "platform"
+ assert form.get("studyname"), "study name"
+ assert form.get("inbredsetid"), "group"
+ assert form.get("tissueid"), "type/tissue"
+
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ values = (
+ form["genechipid"],
+ form["tissueid"],
+ form["studyname"],
+ form.get("studyfullname", ""),
+ form.get("studyshortname", ""),
+ datetime.now().date().strftime("%Y-%m-%d"),
+ form["inbredsetid"])
+ query = (
+ "INSERT INTO ProbeFreeze("
+ "ChipId, TissueId, Name, FullName, ShortName, CreateTime, "
+ "InbredSetId"
+ ") VALUES (%s, %s, %s, %s, %s, %s, %s)")
+ cursor.execute(query, values)
+ new_studyid = cursor.lastrowid
+ cursor.execute(
+ "UPDATE ProbeFreeze SET ProbeFreezeId=%s WHERE Id=%s",
+ (new_studyid, new_studyid))
+ flash("Study created successfully", "alert-success")
+ return render_template(
+ "continue_from_create_study.html",
+ filename=form["filename"], filetype=form["filetype"],
+ totallines=form["totallines"], species=form["species"],
+ genechipid=form["genechipid"], studyid=new_studyid)
+ except AssertionError as aserr:
+ flash(f"Missing data: {aserr.args[0]}", "alert-error")
+ return redirect(url_for("dbinsert.select_study"), code=307)
+
+def datasets_by_study(studyid:int) -> tuple:
+ "Retrieve datasets associated with a study with the ID `studyid`."
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ query = "SELECT * FROM ProbeSetFreeze WHERE ProbeFreezeId=%s"
+ cursor.execute(query, (studyid,))
+ return tuple(cursor.fetchall())
+
+ return tuple()
+
+def averaging_methods() -> tuple:
+ "Retrieve averaging methods from database"
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute("SELECT * FROM AvgMethod")
+ return tuple(cursor.fetchall())
+
+ return tuple()
+
+def dataset_datascales() -> tuple:
+ "Retrieve datascales from database"
+ with database_connection() as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(
+ 'SELECT DISTINCT DataScale FROM ProbeSetFreeze '
+ 'WHERE DataScale IS NOT NULL AND DataScale != ""')
+ return tuple(
+ item for item in
+ (res[0].strip() for res in cursor.fetchall())
+ if (item is not None and item != ""))
+
+ return tuple()
+
+@dbinsertbp.route("/dataset", methods=["POST"])
+def select_dataset():
+ "Select the dataset to add the file contents against"
+ form = request.form
+ try:
+ assert form.get("filename"), "filename"
+ assert form.get("filetype"), "filetype"
+ assert form.get("species"), "species"
+ assert form.get("genechipid"), "platform"
+ assert form.get("studyid"), "study"
+
+ studyid = form["studyid"]
+ datasets = datasets_by_study(studyid)
+ return render_template(
+ "select_dataset.html", **{**form, "studyid": studyid},
+ datasets=datasets, avgmethods=averaging_methods(),
+ datascales=dataset_datascales())
+ except AssertionError as aserr:
+ return render_error(f"Missing data: {aserr.args[0]}")
+
+@dbinsertbp.route("/create-dataset", methods=["POST"])
+def create_dataset():
+ "Select the dataset to add the file contents against"
+ form = request.form
+ try:
+ assert form.get("filename"), "filename"
+ assert form.get("filetype"), "filetype"
+ assert form.get("species"), "species"
+ assert form.get("genechipid"), "platform"
+ assert form.get("studyid"), "study"
+ assert form.get("avgid"), "averaging method"
+ assert form.get("datasetname2"), "Dataset Name 2"
+ assert form.get("datasetfullname"), "Dataset Full Name"
+ assert form.get("datasetshortname"), "Dataset Short Name"
+ assert form.get("datasetpublic"), "Dataset public specification"
+ assert form.get("datasetconfidentiality"), "Dataset confidentiality"
+ assert form.get("datasetdatascale"), "Dataset Datascale"
+
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ datasetname = form["datasetname"]
+ cursor.execute("SELECT * FROM ProbeSetFreeze WHERE Name=%s",
+ (datasetname,))
+ results = cursor.fetchall()
+ if bool(results):
+ flash("A dataset with that name already exists.",
+ "alert-error")
+ return redirect(url_for("dbinsert.select_dataset"), code=307)
+ values = (
+ form["studyid"], form["avgid"],
+ datasetname, form["datasetname2"],
+ form["datasetfullname"], form["datasetshortname"],
+ datetime.now().date().strftime("%Y-%m-%d"),
+ form["datasetpublic"], form["datasetconfidentiality"],
+ "williamslab", form["datasetdatascale"])
+ query = (
+ "INSERT INTO ProbeSetFreeze("
+ "ProbeFreezeId, AvgID, Name, Name2, FullName, "
+ "ShortName, CreateTime, OrderList, public, "
+ "confidentiality, AuthorisedUsers, DataScale) "
+ "VALUES"
+ "(%s, %s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s)")
+ cursor.execute(query, values)
+ new_datasetid = cursor.lastrowid
+ return render_template(
+ "continue_from_create_dataset.html",
+ filename=form["filename"], filetype=form["filetype"],
+ species=form["species"], genechipid=form["genechipid"],
+ studyid=form["studyid"], datasetid=new_datasetid,
+ totallines=form["totallines"])
+ except AssertionError as aserr:
+ flash(f"Missing data {aserr.args[0]}", "alert-error")
+ return redirect(url_for("dbinsert.select_dataset"), code=307)
+
+def study_by_id(studyid:int) -> Union[dict, None]:
+ "Get a study by its Id"
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT * FROM ProbeFreeze WHERE Id=%s",
+ (studyid,))
+ return cursor.fetchone()
+
+def dataset_by_id(datasetid:int) -> Union[dict, None]:
+ "Retrieve a dataset by its id"
+ with database_connection() as conn:
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ ("SELECT AvgMethod.Name AS AvgMethodName, ProbeSetFreeze.* "
+ "FROM ProbeSetFreeze INNER JOIN AvgMethod "
+ "ON ProbeSetFreeze.AvgId=AvgMethod.AvgMethodId "
+ "WHERE ProbeSetFreeze.Id=%s"),
+ (datasetid,))
+ return cursor.fetchone()
+
+def selected_keys(original: dict, keys: tuple) -> dict:
+ "Return a new dict from the `original` dict with only `keys` present."
+ return {key: value for key,value in original.items() if key in keys}
+
+@dbinsertbp.route("/final-confirmation", methods=["POST"])
+def final_confirmation():
+ "Preview the data before triggering entry into the database"
+ form = request.form
+ try:
+ assert form.get("filename"), "filename"
+ assert form.get("filetype"), "filetype"
+ assert form.get("species"), "species"
+ assert form.get("genechipid"), "platform"
+ assert form.get("studyid"), "study"
+ assert form.get("datasetid"), "dataset"
+
+ speciesid = form["species"]
+ genechipid = form["genechipid"]
+ studyid = form["studyid"]
+ datasetid=form["datasetid"]
+ return render_template(
+ "final_confirmation.html", filename=form["filename"],
+ filetype=form["filetype"], totallines=form["totallines"],
+ species=speciesid, genechipid=genechipid, studyid=studyid,
+ datasetid=datasetid, the_species=selected_keys(
+ with_db_connection(lambda conn: species_by_id(conn, speciesid)),
+ ("SpeciesName", "Name", "MenuName")),
+ platform=selected_keys(
+ platform_by_id(genechipid),
+ ("GeneChipName", "Name", "GeoPlatform", "Title", "GO_tree_value")),
+ study=selected_keys(
+ study_by_id(studyid), ("Name", "FullName", "ShortName")),
+ dataset=selected_keys(
+ dataset_by_id(datasetid),
+ ("AvgMethodName", "Name", "Name2", "FullName", "ShortName",
+ "DataScale")))
+ except AssertionError as aserr:
+ return render_error(f"Missing data: {aserr.args[0]}")
+
+@dbinsertbp.route("/insert-data", methods=["POST"])
+def insert_data():
+ "Trigger data insertion"
+ form = request.form
+ try:
+ assert form.get("filename"), "filename"
+ assert form.get("filetype"), "filetype"
+ assert form.get("species"), "species"
+ assert form.get("genechipid"), "platform"
+ assert form.get("studyid"), "study"
+ assert form.get("datasetid"), "dataset"
+
+ filename = form["filename"]
+ filepath = f"{app.config['UPLOAD_FOLDER']}/{filename}"
+ redisurl = app.config["REDIS_URL"]
+ if os.path.exists(filepath):
+ with Redis.from_url(redisurl, decode_responses=True) as rconn:
+ job = jobs.launch_job(
+ jobs.data_insertion_job(
+ rconn, filepath, form["filetype"], form["totallines"],
+ form["species"], form["genechipid"], form["datasetid"],
+ app.config["SQL_URI"], redisurl,
+ app.config["JOBS_TTL_SECONDS"]),
+ redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors")
+
+ return redirect(url_for("dbinsert.insert_status", job_id=job["jobid"]))
+ return render_error(f"File '{filename}' no longer exists.")
+ except AssertionError as aserr:
+ return render_error(f"Missing data: {aserr.args[0]}")
+
+@dbinsertbp.route("/status/<job_id>", methods=["GET"])
+def insert_status(job_id: str):
+ "Retrieve status of data insertion."
+ with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+ job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
+
+ if job:
+ job_status = job["status"]
+ if job_status == "success":
+ return render_template("insert_success.html", job=job)
+ if job["status"] == "error":
+ return render_template("insert_error.html", job=job)
+ return render_template("insert_progress.html", job=job)
+ return render_template("no_such_job.html", job_id=job_id), 400