about summary refs log tree commit diff
path: root/uploader/population/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/population/views.py')
-rw-r--r--uploader/population/views.py245
1 files changed, 245 insertions, 0 deletions
diff --git a/uploader/population/views.py b/uploader/population/views.py
new file mode 100644
index 0000000..795ce81
--- /dev/null
+++ b/uploader/population/views.py
@@ -0,0 +1,245 @@
+"""Views dealing with populations/inbredsets"""
+import json
+import base64
+
+from markupsafe import escape
+from MySQLdb.cursors import DictCursor
+from gn_libs.mysqldb import database_connection
+from flask import (flash,
+                   request,
+                   redirect,
+                   Blueprint,
+                   current_app as app)
+
+from uploader.samples.views import samplesbp
+from uploader.flask_extensions import url_for
+from uploader.oauth2.client import oauth2_post
+from uploader.ui import make_template_renderer
+from uploader.authorisation import require_login
+from uploader.genotypes.views import genotypesbp
+from uploader.datautils import enumerate_sequence
+from uploader.phenotypes.views import phenotypesbp
+from uploader.expression_data.views import exprdatabp
+from uploader.species.models import all_species, species_by_id
+from uploader.monadic_requests import make_either_error_handler
+from uploader.input_validation import is_valid_representative_name
+from uploader.phenotypes.models import (dataset_phenotypes,
+                                        datasets_by_population)
+
+from .models import (save_population,
+                     population_families,
+                     populations_by_species,
+                     population_genetic_types,
+                     population_by_species_and_id)
+
+__active_link__ = "populations"
+popbp = Blueprint("populations", __name__)
+popbp.register_blueprint(samplesbp, url_prefix="/")
+popbp.register_blueprint(genotypesbp, url_prefix="/")
+popbp.register_blueprint(phenotypesbp, url_prefix="/")
+popbp.register_blueprint(exprdatabp, url_prefix="/")
+render_template = make_template_renderer("populations")
+
+
+@popbp.route("/populations", methods=["GET", "POST"])
+@require_login
+def index():
+    """Entry point for populations."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        if not bool(request.args.get("species_id")):
+            return render_template(
+                "populations/index.html",
+                species=all_species(conn),
+                activelink="populations")
+
+        species_id = request.args.get("species_id")
+        if species_id == "CREATE-SPECIES":
+            return redirect(url_for(
+                "species.create_species",
+                return_to="species.populations.list_species_populations"))
+
+        species = species_by_id(conn, request.args.get("species_id"))
+        if not bool(species):
+            flash("Invalid species identifier provided!", "alert-danger")
+            return redirect(url_for("species.populations.index"))
+        return redirect(url_for("species.populations.list_species_populations",
+                                species_id=species["SpeciesId"]))
+
+@popbp.route("/<int:species_id>/populations", methods=["GET"])
+@require_login
+def list_species_populations(species_id: int):
+    """List a particular species' populations."""
+    with database_connection(app.config["SQL_URI"]) as conn:
+        species = species_by_id(conn, species_id)
+        if not bool(species):
+            flash("No species was found for given ID.", "alert-danger")
+            return redirect(url_for("species.populations.index"))
+        return render_template(
+            "populations/list-populations.html",
+            species=species,
+            populations=enumerate_sequence(populations_by_species(
+                conn, species_id)),
+            activelink="list-populations")
+
+
+@popbp.route("/<int:species_id>/populations/create", methods=["GET", "POST"])
+@require_login
+def create_population(species_id: int):
+    """Create a new population."""
+    with (database_connection(app.config["SQL_URI"]) as conn,
+          conn.cursor(cursorclass=DictCursor) as cursor):
+        species = species_by_id(conn, species_id)
+
+        if request.method == "GET":
+            error_values = request.args.get("error_values")
+            if not bool(error_values):
+                error_values = base64.b64encode(
+                    '{"errors":{}, "error_values": {}}'.encode("utf8")
+                ).decode("utf8")
+
+            error_values = json.loads(base64.b64decode(
+                error_values.encode("utf8")).decode("utf8"))# type: ignore[union-attr]
+            return render_template(
+                "populations/create-population.html",
+                species=species,
+                families = population_families(conn, species["SpeciesId"]),
+                genetic_types = population_genetic_types(conn),
+                mapping_methods=(
+                    {"id": "0", "value": "No mapping support"},
+                    {"id": "1", "value": "GEMMA, QTLReaper, R/qtl"},
+                    {"id": "2", "value": "GEMMA"},
+                    {"id": "3", "value": "R/qtl"},
+                    {"id": "4", "value": "GEMMA, PLINK"}),
+                return_to=(request.args.get("return_to") or ""),
+                activelink="create-population",
+                **error_values)
+
+        if not bool(species):
+            flash("You must select a species.", "alert-danger")
+            return redirect(url_for("species.populations.index"))
+
+        errors: tuple[tuple[str, str], ...] = tuple()
+
+        population_name = (request.form.get(
+            "population_name") or "").strip()
+        if not bool(population_name):
+            errors = errors + (("population_name",
+                                "You must provide a name for the population!"),)
+
+        if not is_valid_representative_name(population_name):
+            errors = errors + ((
+                "population_name",
+                "The population name can only contain letters, numbers, "
+                "hyphens and underscores."),)
+
+        population_fullname = (request.form.get(
+            "population_fullname") or "").strip()
+        if not bool(population_fullname):
+            errors = errors + (
+                ("population_fullname", "Full Name MUST be provided."),)
+
+        if bool(errors):
+            values = base64.b64encode(
+                json.dumps({
+                    "errors": dict(errors),
+                    "error_values": dict(request.form)
+                }).encode("utf8"))
+            return redirect(url_for("species.populations.create_population",
+                                    species_id=species["SpeciesId"],
+                                    error_values=values))
+
+        new_population = save_population(cursor, {
+            "SpeciesId": species["SpeciesId"],
+            "Name": population_name,
+            "InbredSetName": population_fullname,
+            "FullName": population_fullname,
+            "InbredSetCode": request.form.get("population_code") or None,
+            "Description": request.form.get("population_description") or None,
+            "Family": request.form.get("population_family", "").strip() or None,
+            "MappingMethodId": request.form.get("population_mapping_method_id"),
+            "GeneticType": request.form.get("population_genetic_type") or None
+        })
+
+        def __flash_success__(_success):
+            flash("Successfully created population "
+                  f"{escape(new_population['FullName'])}.",
+                  "alert-success")
+            return_to = request.form.get("return_to") or ""
+            if return_to:
+                return redirect(url_for(
+                    return_to,
+                    species_id=species["SpeciesId"],
+                    population_id=new_population["InbredSetId"]))
+            return redirect(url_for(
+                "species.populations.view_population",
+                species_id=species["SpeciesId"],
+                population_id=new_population["InbredSetId"]))
+
+        app.logger.debug("We begin setting up the privileges here…")
+        return oauth2_post(
+            "auth/resource/populations/create",
+            json={
+                **dict(request.form),
+                "species_id": species_id,
+                "population_id": new_population["Id"],
+                "public": "on"
+            }
+        ).either(
+            make_either_error_handler(
+                "There was an error creating the population"),
+            __flash_success__)
+
+
+@popbp.route("/<int:species_id>/populations/<int:population_id>",
+             methods=["GET"])
+@require_login
+def view_population(species_id: int, population_id: int):
+    """View the details of a population."""
+    streamlined_ui =  request.args.get("streamlined_ui")
+    with database_connection(app.config["SQL_URI"]) as conn:
+        species = species_by_id(conn, species_id)
+        population = population_by_species_and_id(conn, species_id, population_id)
+        datasets = datasets_by_population(conn, species_id, population_id)
+        error = False
+        if len(datasets) > 1:
+            error = True
+            flash("Got more than one dataset for the population.", "alert alert-danger")
+
+        if not bool(species):
+            flash("You must select a species.", "alert-danger")
+            error = True
+
+        if not bool(population):
+            flash("You must select a population.", "alert-danger")
+            error = True
+
+        if error:
+            return redirect(url_for(("species.view_species"
+                                     if bool(streamlined_ui)
+                                     else "species.populations.index"),
+                                    species_id=species["SpeciesId"],
+                                    streamlined_ui=streamlined_ui))
+
+        _datasets = datasets_by_population(
+            conn, species["SpeciesId"], population["Id"])
+        assert len(datasets) == 0 or len(datasets) == 1, (
+            "We expect only one phenotypes dataset per population.")
+        _kwargs = {
+            "species": species,
+            "population": population,
+            "activelink": "view-population",
+            "streamlined_ui": streamlined_ui,
+            "view_under_construction": request.args.get(
+                "view_under_construction", False)
+        }
+
+        if len(_datasets) == 1:
+            _dataset = _datasets[0]
+            _kwargs = {
+                **_kwargs,
+                "dataset": _dataset,
+                "phenotypes": enumerate_sequence(
+                    dataset_phenotypes(conn, population["Id"], _dataset["Id"]))
+            }
+
+        return render_template("populations/view-population.html", **_kwargs)