From cdd4dc456e56bb4eb055e1cb7f2518d45fb3bfb9 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Sat, 20 Jan 2024 09:57:23 +0300 Subject: Fetch sample/case names from database Fetch the sample/case names from the database rather than from a static file in the repository. Issue: https://issues.genenetwork.org/issues/quality-control/read-samples-from-database-by-species --- qc_app/entry.py | 7 ++-- qc_app/jobs.py | 19 +++++++---- qc_app/parse.py | 25 ++++++++++++-- qc_app/static/js/upload_progress.js | 3 ++ qc_app/templates/index.html | 17 ++++++++-- quality_control/parsing.py | 27 +++++++-------- scripts/qc.py | 45 ++++++++++--------------- scripts/validate_file.py | 65 +++++++++++++++++++------------------ tests/conftest.py | 15 +++++++-- 9 files changed, 134 insertions(+), 89 deletions(-) diff --git a/qc_app/entry.py b/qc_app/entry.py index 0cd34c5..987fdcd 100644 --- a/qc_app/entry.py +++ b/qc_app/entry.py @@ -104,9 +104,10 @@ def upload_file(): return render_template( "index.html", species=with_db_connection(species)), 400 - return redirect(url_for( - "parse.parse", filename=filename, - filetype=request.form["filetype"])) + return redirect(url_for("parse.parse", + speciesid=request.form["speciesid"], + filename=filename, + filetype=request.form["filetype"])) @entrybp.route("/data-review", methods=["GET"]) def data_review(): diff --git a/qc_app/jobs.py b/qc_app/jobs.py index a8257a3..f5e5173 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -32,17 +32,24 @@ def initialise_job(# pylint: disable=[too-many-arguments] redis_conn.expire(name=the_job["job_id"], time=timedelta(seconds=ttl_seconds)) return the_job -def build_file_verification_job( - redis_conn: Redis, filepath: str, filetype: str, redisurl: str, +def build_file_verification_job(#pylint: disable=[too-many-arguments] + redis_conn: Redis, + dburi: str, + redisuri: str, + speciesid: int, + filepath: str, + filetype: str, ttl_seconds: int): "Build a file verification job" - job_id = str(uuid4()) + jobid = str(uuid4()) command = [ - sys.executable, "-m", "scripts.validate_file", filetype, filepath, redisurl, - job_id + sys.executable, "-m", "scripts.validate_file", + dburi, redisuri, jobid, + "--redisexpiry", str(ttl_seconds), + str(speciesid), filetype, filepath, ] return initialise_job( - redis_conn, job_id, command, "file-verification", ttl_seconds, { + redis_conn, jobid, command, "file-verification", ttl_seconds, { "filetype": filetype, "filename": os.path.basename(filepath), "percent": 0 }) diff --git a/qc_app/parse.py b/qc_app/parse.py index ceb8fcf..40f7b44 100644 --- a/qc_app/parse.py +++ b/qc_app/parse.py @@ -7,7 +7,10 @@ from flask import flash, request, url_for, redirect, Blueprint, render_template from flask import current_app as app from quality_control.errors import InvalidValue, DuplicateHeading -from . import jobs + +from qc_app import jobs +from qc_app.dbinsert import species_by_id +from qc_app.db_utils import with_db_connection parsebp = Blueprint("parse", __name__) @@ -23,8 +26,25 @@ def isduplicateheading(item): def parse(): """Trigger file parsing""" errors = False + speciesid = request.args.get("speciesid") filename = request.args.get("filename") filetype = request.args.get("filetype") + if speciesid is None: + flash("No species selected", "alert-error") + errors = True + else: + try: + speciesid = int(speciesid) + species = with_db_connection( + lambda con: species_by_id(con, speciesid)) + if not bool(species): + flash("No such species.", "alert-error") + errors = True + except ValueError: + flash("Invalid speciesid provided. Expected an integer.", + "alert-error") + errors = True + if filename is None: flash("No file provided", "alert-error") errors = True @@ -50,7 +70,8 @@ def parse(): with Redis.from_url(redisurl, decode_responses=True) as rconn: job = jobs.launch_job( jobs.build_file_verification_job( - rconn, filepath, filetype, redisurl, + rconn, app.config["SQL_URI"], redisurl, + speciesid, filepath, filetype, app.config["JOBS_TTL_SECONDS"]), redisurl, f"{app.config['UPLOAD_FOLDER']}/job_errors") diff --git a/qc_app/static/js/upload_progress.js b/qc_app/static/js/upload_progress.js index 98a503a..c98c33c 100644 --- a/qc_app/static/js/upload_progress.js +++ b/qc_app/static/js/upload_progress.js @@ -65,6 +65,9 @@ function selected_filetype(radios) { function setup_formdata(form) { var formdata = new FormData(); + formdata.append( + "speciesid", + form.querySelector("#select_species01").value) formdata.append( "qc_text_file", form.querySelector("input[type='file']").files[0]); diff --git a/qc_app/templates/index.html b/qc_app/templates/index.html index a454dd2..358b521 100644 --- a/qc_app/templates/index.html +++ b/qc_app/templates/index.html @@ -1,4 +1,5 @@ {%extends "base.html"%} +{%from "flash_messages.html" import flash_all_messages%} {%block title%}Data Upload{%endblock%} @@ -54,6 +55,18 @@ {%endif%} {%endwith%} +
+ + +
+
file type @@ -111,8 +124,8 @@
upload samples
- - {%for spec in species%} diff --git a/quality_control/parsing.py b/quality_control/parsing.py index c545937..f7a664f 100644 --- a/quality_control/parsing.py +++ b/quality_control/parsing.py @@ -4,6 +4,9 @@ from enum import Enum from functools import partial from typing import Tuple, Union, Generator, Callable, Optional +import MySQLdb as mdb +from MySQLdb.cursors import DictCursor + import quality_control.average as avg from quality_control.file_utils import open_file import quality_control.standard_error as se @@ -17,21 +20,15 @@ class FileType(Enum): AVERAGE = 1 STANDARD_ERROR = 2 -def strain_names(filepath): - """Retrieve the strains names from given file""" - strains = set() - with open(filepath, encoding="utf8") as strains_file: - for idx, line in enumerate(strains_file.readlines()): - if idx > 0: - parts = line.split() - for name in (parts[1], parts[2]): - strains.add(name.strip()) - if len(parts) >= 6: - alias = parts[5].strip() - if alias != "" and alias not in ("P", "\\N"): - strains.add(alias) - - return strains +def strain_names(dbconn: mdb.Connection, speciesid: int) -> tuple[str, ...]: + """Retrieve samples/cases from database.""" + with dbconn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT * FROM Strain WHERE SpeciesId=%s", + (speciesid,)) + samplenames = ((row["Name"], row["Name2"]) for row in cursor.fetchall()) + return tuple(set(filter( + lambda item: bool(item.strip() if item is not None else item), + (name for names in samplenames for name in names)))) def header_errors(line_number, fields, strains): """Gather all header row errors.""" diff --git a/scripts/qc.py b/scripts/qc.py index 7d41d6c..e8573a9 100644 --- a/scripts/qc.py +++ b/scripts/qc.py @@ -1,9 +1,9 @@ """Implements the command-line interface for the qc application""" import os import sys -import argparse import mimetypes from typing import Union, Callable +from argparse import ArgumentParser from functional_tools import take @@ -11,17 +11,18 @@ from quality_control.utils import make_progress_calculator from quality_control.errors import InvalidValue, DuplicateHeading from quality_control.parsing import FileType, strain_names, collect_errors +from qc_app.db_utils import database_connection + +from .cli_parser import init_cli_parser + def is_file_mime(filepath:str, mimetype:str) -> bool: """Check that `filepath` has a mimetype of `mimetype` or `text/plain`""" the_type = mimetypes.guess_type(filepath)[0] return the_type in ("text/plain", mimetype) -def cli_argument_parser(): - """Create the parser for the CLI arguments""" - parser = argparse.ArgumentParser( - prog="qc", description = ( - "Command-Line Interface program for quality control of data files")) +def add_file_validation_arguments(parser: ArgumentParser) -> ArgumentParser: + """File validation specific CLI arguments.""" parser.add_argument( "filetype", help="The type of file to check", @@ -32,14 +33,6 @@ def cli_argument_parser(): "The path to the file to be checked." "If an absolute path is not provided, then the file will be relative to" f"\t'{os.getcwd()}'")) - default_strains_file = os.path.join( - os.path.dirname(os.path.dirname(__file__)), "etc/strains.csv") - parser.add_argument( - "-s", "--strainsfile", - help=( - "Path to the file containing allowable strains/samples. " - f"[default '{default_strains_file}']"), - default=default_strains_file) parser.add_argument( "-c", "--count", type=int, @@ -54,6 +47,14 @@ def cli_argument_parser(): default=False, action="store_true") return parser +def cli_argument_parser(): + """Create the parser for the CLI arguments""" + theparser = init_cli_parser( + "qc", + "Command-Line Interface program for quality control of data files") + theparser.add_argument("speciesid", type=int, help="ID of the species.") + return add_file_validation_arguments(theparser) + def make_progress_indicator( verbose: bool, progress_calc_fn: Callable) -> Union[Callable, None]: """Utility to display the progress""" @@ -106,26 +107,14 @@ def main(): print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) return 1 - if not os.path.exists(args.strainsfile): - print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) - return 2 - if not is_file_mime(args.filepath, "text/tab-separated-values"): print( f"The file '{args.filepath}' MUST be a tab-separated file.", file=sys.stderr) return 3 - if not is_file_mime(args.strainsfile, "text/csv"): - print( - f"The file '{args.strainsfile}' MUST be a tab-separated file.", - file=sys.stderr) - return 4 - - if args.verbose: - print(f"Parsing the strain names from '{args.strainsfile}'") - - strains = strain_names(os.path.realpath(args.strainsfile)) + with database_connection(args.databaseuri) as dbconn: + strains = strain_names(dbconn, args.speciesid) filepath = os.path.realpath(args.filepath) if args.verbose: diff --git a/scripts/validate_file.py b/scripts/validate_file.py index 9f0a561..4b4fc0c 100644 --- a/scripts/validate_file.py +++ b/scripts/validate_file.py @@ -11,16 +11,20 @@ from redis.exceptions import ConnectionError # pylint: disable=[redefined-builti from quality_control.utils import make_progress_calculator from quality_control.parsing import FileType, strain_names, collect_errors -from .qc import cli_argument_parser +from qc_app.db_utils import database_connection -def make_progress_indicator( - redis_connection: Redis, job_id: str, - progress_calc_fn: Callable) -> Callable: +from .cli_parser import init_cli_parser +from .qc import add_file_validation_arguments + + +def make_progress_indicator(redis_connection: Redis, + jobid: str, + progress_calc_fn: Callable) -> Callable: """Make function that will compute the progress and update redis""" def __indicator__(linenumber, linetext): progress = progress_calc_fn(linenumber, linetext) - redis_connection.hset(name=job_id, mapping=progress._asdict()) + redis_connection.hset(name=str(jobid), mapping=progress._asdict()) return progress @@ -32,13 +36,9 @@ def cli_args_valid(args): print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) return None - if not os.path.exists(args.strainsfile): - print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) - return None - try: - conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable] - except ConnectionError as conn_err: # pylint: disable=[unused-variable] + _conn = Redis.from_url(args.redisuri) + except ConnectionError as _conn_err: print(traceback.format_exc(), file=sys.stderr) return None @@ -46,28 +46,31 @@ def cli_args_valid(args): def process_cli_arguments(): """Setup command-line parser""" - parser = cli_argument_parser() - parser.prog = "worker" - parser.add_argument( - "redisurl", default="redis:///", help="URL to the redis server") - parser.add_argument("job_id", help="The id of the job being processed") + parser = init_cli_parser( + "validate-file", + ("Verify that the file with the expression data conforms to " + "expectations.")) + parser.add_argument("speciesid", + type=int, + help="Species for which the data is to be processed.") + parser = add_file_validation_arguments(parser) return cli_args_valid(parser.parse_args()) -def stream_error(redis_conn, job_id, error): +def stream_error(redis_conn, jobid, error): """Update redis with the most current error(s) found""" errors = jsonpickle.decode( - redis_conn.hget(job_id, key="errors") or jsonpickle.encode(tuple())) + redis_conn.hget(str(jobid), key="errors") or jsonpickle.encode(tuple())) redis_conn.hset( - job_id, key="errors", value=jsonpickle.encode(errors + (error,))) + str(jobid), key="errors", value=jsonpickle.encode(errors + (error,))) -def make_user_aborted(redis_conn, job_id): +def make_user_aborted(redis_conn, jobid): """Mkae function that checks whether the user aborted the process""" def __aborted__(): user_aborted = bool(int( - redis_conn.hget(name=job_id, key="user_aborted") or "0")) + redis_conn.hget(name=str(jobid), key="user_aborted") or "0")) if user_aborted: - redis_conn.hset(name=job_id, key="status", value="aborted") + redis_conn.hset(name=str(jobid), key="status", value="aborted") return user_aborted return __aborted__ @@ -84,36 +87,36 @@ def main(): print("Quiting due to errors!", file=sys.stderr) return 1 - with Redis.from_url(args.redisurl) as redis_conn: + with (Redis.from_url(args.redisuri) as redis_conn, + database_connection(args.databaseuri) as dbconn): progress_calculator = make_progress_calculator( get_zipfile_size(args.filepath) if is_zipfile(args.filepath) else os.stat(args.filepath).st_size) progress_indicator = make_progress_indicator( - redis_conn, args.job_id, progress_calculator) + redis_conn, args.jobid, progress_calculator) count = args.count filepath = args.filepath filetype = ( FileType.AVERAGE if args.filetype == "average" else FileType.STANDARD_ERROR) - strains = strain_names(args.strainsfile) + strains = strain_names(dbconn, args.speciesid) + redis_conn.hset(name=str(args.jobid), key="status", value="Processing") redis_conn.hset( - name=args.job_id, key="status", value="Processing") - redis_conn.hset( - name=args.job_id, key="message", value="Collecting errors") + name=str(args.jobid), key="message", value="Collecting errors") error_count = 0 for error in collect_errors( filepath, filetype, strains, progress_indicator, - make_user_aborted(redis_conn, args.job_id)): - stream_error(redis_conn, args.job_id, error) + make_user_aborted(redis_conn, args.jobid)): + stream_error(redis_conn, args.jobid, error) if count > 0: error_count = error_count + 1 if error_count >= count: break - redis_conn.hset(name=args.job_id, key="status", value="success") + redis_conn.hset(name=str(args.jobid), key="status", value="success") return 0 diff --git a/tests/conftest.py b/tests/conftest.py index 41dcac1..d441cb9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,13 +11,24 @@ import pytest import jsonpickle from qc_app import create_app -from quality_control.parsing import strain_names from quality_control.errors import InvalidValue, DuplicateHeading @pytest.fixture(scope="session") def strains(): """Parse the strains once every test session""" - return strain_names("etc/strains.csv") + stainnames = set() + with open("etc/strains.csv", encoding="utf8") as strains_file: + for idx, line in enumerate(strains_file.readlines()): + if idx > 0: + parts = line.split() + for name in (parts[1], parts[2]): + stainnames.add(name.strip()) + if len(parts) >= 6: + alias = parts[5].strip() + if alias != "" and alias not in ("P", "\\N"): + stainnames.add(alias) + + return tuple(stainnames) def is_port_in_use(port: int) -> bool: "Check whether `port` is in use" -- cgit v1.2.3