"""Code regarding samples"""
import sys
import uuid
import logging
from pathlib import Path
from flask import (flash,
request,
redirect,
Blueprint,
current_app as app)
from gn_libs import jobs
from gn_libs import sqlite3
from uploader import session
from uploader.files import save_file
from uploader.flask_extensions import url_for
from uploader.ui import make_template_renderer
from uploader.authorisation import require_login
from uploader.input_validation import is_integer_input
from uploader.population.models import population_by_id
from uploader.route_utils import generic_select_population
from uploader.datautils import safe_int, enumerate_sequence
from uploader.species.models import all_species, species_by_id
from uploader.request_checks import with_species, with_population
from uploader.db_utils import (with_db_connection,
database_connection)
from .models import samples_by_species_and_population
samplesbp = Blueprint("samples", __name__)
render_template = make_template_renderer("samples")
@samplesbp.route("/samples", methods=["GET"])
@require_login
def index():
"""Direct entry-point for uploading/handling the samples."""
with database_connection(app.config["SQL_URI"]) as conn:
if not bool(request.args.get("species_id")):
return render_template(
"samples/index.html",
species=all_species(conn),
activelink="samples")
species_id = request.args.get("species_id")
if species_id == "CREATE-SPECIES":
return redirect(url_for(
"species.create_species",
return_to="species.populations.samples.select_population"))
species = species_by_id(conn, request.args.get("species_id"))
if not bool(species):
flash("No such species!", "alert-danger")
return redirect(url_for("species.populations.samples.index"))
return redirect(url_for("species.populations.samples.select_population",
species_id=species["SpeciesId"]))
@samplesbp.route("<int:species_id>/samples/select-population", methods=["GET"])
@require_login
@with_species(redirect_uri="species.populations.samples.index")
def select_population(species: dict, **kwargs):# pylint: disable=[unused-argument]
"""Select the population to use for the samples."""
return generic_select_population(
species,
"samples/select-population.html",
request.args.get("population_id") or "",
"species.populations.samples.select_population",
"species.populations.samples.list_samples",
"samples",
"Population not found!")
@samplesbp.route("<int:species_id>/populations/<int:population_id>/samples")
@require_login
@with_population(
species_redirect_uri="species.populations.samples.index",
redirect_uri="species.populations.samples.select_population")
def list_samples(species: dict, population: dict, **kwargs):# pylint: disable=[unused-argument]
"""
List the samples in a particular population and give the ability to upload
new ones.
"""
with database_connection(app.config["SQL_URI"]) as conn:
all_samples = enumerate_sequence(samples_by_species_and_population(
conn, species["SpeciesId"], population["Id"]))
total_samples = len(all_samples)
offset = max(safe_int(request.args.get("from") or 0), 0)
count = int(request.args.get("count") or 20)
return render_template("samples/list-samples.html",
species=species,
population=population,
samples=all_samples[offset:offset+count],
offset=offset,
count=count,
total_samples=total_samples,
activelink="list-samples")
@samplesbp.route("<int:species_id>/populations/<int:population_id>/upload-samples",
methods=["GET", "POST"])
@require_login
def upload_samples(species_id: int, population_id: int):#pylint: disable=[too-many-return-statements]
"""Upload the samples."""
samples_uploads_page = redirect(url_for(
"species.populations.samples.upload_samples",
species_id=species_id,
population_id=population_id))
if not is_integer_input(species_id):
flash("You did not provide a valid species. Please select one to "
"continue.",
"alert-danger")
return redirect(url_for("expression-data.samples.select_species"))
species = with_db_connection(lambda conn: species_by_id(conn, species_id))
if not bool(species):
flash("Species with given ID was not found.", "alert-danger")
return redirect(url_for("expression-data.samples.select_species"))
if not is_integer_input(population_id):
flash("You did not provide a valid population. Please select one "
"to continue.",
"alert-danger")
return redirect(url_for("species.populations.samples.select_population",
species_id=species_id),
code=307)
population = with_db_connection(
lambda conn: population_by_id(conn, int(population_id)))
if not bool(population):
flash("Invalid grouping/population!", "alert-error")
return redirect(url_for("species.populations.samples.select_population",
species_id=species_id),
code=307)
if request.method == "GET" or request.files.get("samples_file") is None:
return render_template("samples/upload-samples.html",
species=species,
population=population)
try:
samples_file = save_file(request.files["samples_file"],
Path(app.config["UPLOAD_FOLDER"]))
except AssertionError:
flash("You need to provide a file with the samples data.",
"alert-error")
return samples_uploads_page
firstlineheading = request.form.get("first_line_heading") == "on"
separator = request.form.get("separator", ",")
if separator == "other":
separator = request.form.get("other_separator", ",")
if not bool(separator):
flash("You need to provide a separator character.", "alert-error")
return samples_uploads_page
quotechar = (request.form.get("field_delimiter", '"') or '"')
_jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]
with sqlite3.connection(_jobs_db) as conn:
job = jobs.launch_job(
jobs.initialise_job(
conn,
str(uuid.uuid4()),
[
sys.executable, "-m", "scripts.insert_samples",
app.config["SQL_URI"],
str(species["SpeciesId"]),
str(population["InbredSetId"]),
str(samples_file.absolute()),
separator,
f"--quotechar={quotechar}"
] + (["--firstlineheading"] if firstlineheading else []),
"samples_upload",
extra_meta={
"job_name": f"Samples Upload: {samples_file.name}"
},
external_id=session.logged_in_user_id()),
_jobs_db,
Path(f"{app.config['UPLOAD_FOLDER']}/job_errors").absolute(),
loglevel=logging.getLevelName(
app.logger.getEffectiveLevel()).lower())
return redirect(
url_for("background-jobs.job_status", job_id=job["job_id"]))