"""Views for expression data"""
import os
import uuid
import mimetypes
from typing import Tuple
from zipfile import ZipFile, is_zipfile
import jsonpickle
from redis import Redis
from werkzeug.utils import secure_filename
from gn_libs.mysqldb import database_connection
from flask import (flash,
request,
redirect,
Blueprint,
current_app as app)
from quality_control.errors import InvalidValue, DuplicateHeading
from uploader import jobs
from uploader.flask_extensions import url_for
from uploader.datautils import order_by_family
from uploader.ui import make_template_renderer
from uploader.authorisation import require_login
from uploader.db_utils import with_db_connection
from uploader.species.models import all_species, species_by_id
from uploader.population.models import (populations_by_species,
population_by_species_and_id)
exprdatabp = Blueprint("expression-data", __name__)
render_template = make_template_renderer("expression-data")
def isinvalidvalue(item):
"""Check whether item is of type InvalidValue"""
return isinstance(item, InvalidValue)
def isduplicateheading(item):
"""Check whether item is of type DuplicateHeading"""
return isinstance(item, DuplicateHeading)
def errors(rqst) -> Tuple[str, ...]:
"""Return a tuple of the errors found in the request `rqst`. If no error is
found, then an empty tuple is returned."""
def __filetype_error__():
return (
("Invalid file type provided.",)
if rqst.form.get("filetype") not in ("average", "standard-error")
else tuple())
def __file_missing_error__():
return (
("No file was uploaded.",)
if ("qc_text_file" not in rqst.files or
rqst.files["qc_text_file"].filename == "")
else tuple())
def __file_mimetype_error__():
text_file = rqst.files["qc_text_file"]
return (
(
("Invalid file! Expected a tab-separated-values file, or a zip "
"file of the a tab-separated-values file."),)
if text_file.mimetype not in (
"text/plain", "text/tab-separated-values",
"application/zip")
else tuple())
return (
__filetype_error__() +
(__file_missing_error__() or __file_mimetype_error__()))
def zip_file_errors(filepath, upload_dir) -> Tuple[str, ...]:
"""Check the uploaded zip file for errors."""
zfile_errors: Tuple[str, ...] = tuple()
if is_zipfile(filepath):
with ZipFile(filepath, "r") as zfile:
infolist = zfile.infolist()
if len(infolist) != 1:
zfile_errors = zfile_errors + (
("Expected exactly one (1) member file within the uploaded zip "
f"file. Got {len(infolist)} member files."),)
if len(infolist) == 1 and infolist[0].is_dir():
zfile_errors = zfile_errors + (
("Expected a member text file in the uploaded zip file. Got a "
"directory/folder."),)
if len(infolist) == 1 and not infolist[0].is_dir():
zfile.extract(infolist[0], path=upload_dir)
mime = mimetypes.guess_type(f"{upload_dir}/{infolist[0].filename}")
if mime[0] != "text/tab-separated-values":
zfile_errors = zfile_errors + (
("Expected the member text file in the uploaded zip file to"
" be a tab-separated file."),)
return zfile_errors
@exprdatabp.route("populations/expression-data", methods=["GET"])
@require_login
def index():
"""Display the expression data index page."""
with database_connection(app.config["SQL_URI"]) as conn:
if not bool(request.args.get("species_id")):
return render_template("expression-data/index.html",
species=order_by_family(all_species(conn)),
activelink="expression-data")
species = species_by_id(conn, request.args.get("species_id"))
if not bool(species):
flash("Could not find species selected!", "alert-danger")
return redirect(url_for("species.populations.expression-data.index"))
return redirect(url_for(
"species.populations.expression-data.select_population",
species_id=species["SpeciesId"]))
@exprdatabp.route("<int:species_id>/populations/expression-data/select-population",
methods=["GET"])
@require_login
def select_population(species_id: int):
"""Select the expression data's population."""
with database_connection(app.config["SQL_URI"]) as conn:
species = species_by_id(conn, species_id)
if not bool(species):
flash("No such species!", "alert-danger")
return redirect(url_for("species.populations.expression-data.index"))
if not bool(request.args.get("population_id")):
return render_template("expression-data/select-population.html",
species=species,
populations=order_by_family(
populations_by_species(conn, species_id),
order_key="FamilyOrder"),
activelink="expression-data")
population = population_by_species_and_id(
conn, species_id, request.args.get("population_id"))
if not bool(population):
flash("No such population!", "alert-danger")
return redirect(url_for(
"species.populations.expression-data.select_population",
species_id=species_id))
return redirect(url_for("species.populations.expression-data.upload_file",
species_id=species_id,
population_id=population["Id"]))
@exprdatabp.route("<int:species_id>/populations/<int:population_id>/"
"expression-data/upload",
methods=["GET", "POST"])
@require_login
def upload_file(species_id: int, population_id: int):
"""Enables uploading the files"""
with database_connection(app.config["SQL_URI"]) as conn:
species = species_by_id(conn, species_id)
population = population_by_species_and_id(conn, species_id, population_id)
if request.method == "GET":
return render_template("expression-data/select-file.html",
species=species,
population=population)
upload_dir = app.config["UPLOAD_FOLDER"]
request_errors = errors(request)
if request_errors:
for error in request_errors:
flash(error, "alert-danger error-expr-data")
return redirect(url_for("species.populations.expression-data.upload_file"))
filename = secure_filename(
request.files["qc_text_file"].filename)# type: ignore[arg-type]
if not os.path.exists(upload_dir):
os.mkdir(upload_dir)
filepath = os.path.join(upload_dir, filename)
request.files["qc_text_file"].save(os.path.join(upload_dir, filename))
zip_errors = zip_file_errors(filepath, upload_dir)
if zip_errors:
for error in zip_errors:
flash(error, "alert-danger error-expr-data")
return redirect(url_for("species.populations.expression-data.index.upload_file"))
return redirect(url_for("species.populations.expression-data.parse_file",
species_id=species_id,
population_id=population_id,
filename=filename,
filetype=request.form["filetype"]))
@exprdatabp.route("/data-review", methods=["GET"])
@require_login
def data_review():
"""Provide some help on data expectations to the user."""
return render_template("expression-data/data-review.html")
@exprdatabp.route(
"<int:species_id>/populations/<int:population_id>/expression-data/parse",
methods=["GET"])
@require_login
def parse_file(species_id: int, population_id: int):
"""Trigger file parsing"""
_errors = False
filename = request.args.get("filename")
filetype = request.args.get("filetype")
species = with_db_connection(lambda con: species_by_id(con, species_id))
if not bool(species):
flash("No such species.", "alert-danger")
_errors = True
if filename is None:
flash("No file provided", "alert-danger")
_errors = True
if filetype is None:
flash("No filetype provided", "alert-danger")
_errors = True
if filetype not in ("average", "standard-error"):
flash("Invalid filetype provided", "alert-danger")
_errors = True
if filename:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename)
if not os.path.exists(filepath):
flash("Selected file does not exist (any longer)", "alert-danger")
_errors = True
if _errors:
return redirect(url_for("species.populations.expression-data.upload_file"))
redisurl = app.config["REDIS_URL"]
with Redis.from_url(redisurl, decode_responses=True) as rconn:
job = jobs.launch_job(
jobs.build_file_verification_job(
rconn, app.config["SQL_URI"], redisurl,
species_id, filepath, filetype,# type: ignore[arg-type]
app.config["JOBS_TTL_SECONDS"]),
redisurl,
f"{app.config['UPLOAD_FOLDER']}/job_errors")
return redirect(url_for("species.populations.expression-data.parse_status",
species_id=species_id,
population_id=population_id,
job_id=job["jobid"]))
@exprdatabp.route(
"<int:species_id>/populations/<int:population_id>/expression-data/parse/"
"status/<uuid:job_id>",
methods=["GET"])
@require_login
def parse_status(species_id: int, population_id: int, job_id: str):
"Retrieve the status of the job"
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
try:
job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
except jobs.JobNotFound as _exc:
return render_template("no_such_job.html", job_id=job_id), 400
error_filename = jobs.error_filename(
job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
if os.path.exists(error_filename):
stat = os.stat(error_filename)
if stat.st_size > 0:
return redirect(url_for("parse.fail", job_id=job_id))
job_id = job["jobid"]
progress = float(job["percent"])
status = job["status"]
filename = job.get("filename", "uploaded file")
_errors = jsonpickle.decode(
job.get("errors", jsonpickle.encode(tuple())))
if status in ("success", "aborted"):
return redirect(url_for("species.populations.expression-data.results",
species_id=species_id,
population_id=population_id,
job_id=job_id))
if status == "parse-error":
return redirect(url_for("species.populations.expression-data.fail", job_id=job_id))
app.jinja_env.globals.update(
isinvalidvalue=isinvalidvalue,
isduplicateheading=isduplicateheading)
return render_template(
"expression-data/job-progress.html",
job_id = job_id,
job_status = status,
progress = progress,
message = job.get("message", ""),
job_name = f"Parsing '{filename}'",
errors=_errors,
species=with_db_connection(
lambda conn: species_by_id(conn, species_id)),
population=with_db_connection(
lambda conn: population_by_species_and_id(
conn, species_id, population_id)))
@exprdatabp.route(
"<int:species_id>/populations/<int:population_id>/expression-data/parse/"
"<uuid:job_id>/results",
methods=["GET"])
@require_login
def results(species_id: int, population_id: int, job_id: uuid.UUID):
"""Show results of parsing..."""
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
if job:
filename = job["filename"]
_errors = jsonpickle.decode(job.get("errors", jsonpickle.encode(tuple())))
app.jinja_env.globals.update(
isinvalidvalue=isinvalidvalue,
isduplicateheading=isduplicateheading)
return render_template(
"expression-data/parse-results.html",
errors=_errors,
job_name = f"Parsing '{filename}'",
user_aborted = job.get("user_aborted"),
job_id=job["jobid"],
species=with_db_connection(
lambda conn: species_by_id(conn, species_id)),
population=with_db_connection(
lambda conn: population_by_species_and_id(
conn, species_id, population_id)))
return render_template("expression-data/no-such-job.html", job_id=job_id)
@exprdatabp.route(
"<int:species_id>/populations/<int:population_id>/expression-data/parse/"
"<uuid:job_id>/fail",
methods=["GET"])
@require_login
def fail(species_id: int, population_id: int, job_id: str):
"""Handle parsing failure"""
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
if job:
error_filename = jobs.error_filename(
job_id, f"{app.config['UPLOAD_FOLDER']}/job_errors")
if os.path.exists(error_filename):
stat = os.stat(error_filename)
if stat.st_size > 0:
return render_template(
"worker_failure.html", job_id=job_id)
return render_template("parse_failure.html", job=job)
return render_template("expression-data/no-such-job.html",
**with_db_connection(lambda conn: {
"species_id": species_by_id(conn, species_id),
"population_id": population_by_species_and_id(
conn, species_id, population_id)}),
job_id=job_id)
@exprdatabp.route(
"<int:species_id>/populations/<int:population_id>/expression-data/parse/"
"abort",
methods=["POST"])
@require_login
def abort(species_id: int, population_id: int):
"""Handle user request to abort file processing"""
job_id = request.form["job_id"]
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
job = jobs.job(rconn, jobs.jobsnamespace(), job_id)
if job:
rconn.hset(name=jobs.job_key(jobs.jobsnamespace(), job_id),
key="user_aborted",
value=int(True))
return redirect(url_for("species.populations.expression-data.parse_status",
species_id=species_id,
population_id=population_id,
job_id=job_id))