about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/cli_parser.py7
-rw-r--r--scripts/insert_data.py8
-rw-r--r--scripts/insert_samples.py30
-rw-r--r--scripts/load_phenotypes_to_db.py526
-rw-r--r--scripts/process_rqtl2_bundle.py29
-rw-r--r--scripts/qc.py4
-rw-r--r--scripts/qc_on_rqtl2_bundle.py6
-rw-r--r--scripts/redis_logger.py23
-rw-r--r--scripts/rqtl2/bundleutils.py44
-rw-r--r--scripts/rqtl2/cli_parser.py20
-rw-r--r--scripts/rqtl2/entry.py60
-rw-r--r--scripts/rqtl2/install_genotypes.py30
-rw-r--r--scripts/rqtl2/install_phenos.py31
-rw-r--r--scripts/rqtl2/phenotypes_qc.py519
-rw-r--r--scripts/validate_file.py2
-rw-r--r--scripts/worker.py2
16 files changed, 1245 insertions, 96 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index 308ee4b..0c91c5e 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -19,6 +19,13 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
         type=int,
         default=86400,
         help="How long to keep any redis keys around.")
+    parser.add_argument(
+        "--loglevel",
+        type=str,
+        default="INFO",
+        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL",
+                 "debug", "info", "warning", "error", "critical"],
+        help="The severity of events to track with the logger.")
     return parser
 
 def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser:
diff --git a/scripts/insert_data.py b/scripts/insert_data.py
index 4b2e5f3..aec0251 100644
--- a/scripts/insert_data.py
+++ b/scripts/insert_data.py
@@ -10,11 +10,11 @@ from typing import Tuple, Iterator
 import MySQLdb as mdb
 from redis import Redis
 from MySQLdb.cursors import DictCursor
+from gn_libs.mysqldb import database_connection
 
 from functional_tools import take
 
 from quality_control.file_utils import open_file
-from uploader.db_utils import database_connection
 from uploader.check_connections import check_db, check_redis
 
 # Set up logging
@@ -197,7 +197,7 @@ def probeset_ids(dbconn: mdb.Connection,
                 break
             yield row
 
-def insert_means(# pylint: disable=[too-many-locals, too-many-arguments]
+def insert_means(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments]
         filepath: str, speciesid: int, platform_id: int, datasetid: int,
         dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument]
     "Insert the means/averages data into the database"
@@ -232,7 +232,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments]
         item for sublist in
         read_datavalues(filepath, headings, strains).values()
         for item in sublist),
-                                     start=(last_data_id(dbconn)+1)))
+                                     start=last_data_id(dbconn)+1))
     with dbconn.cursor(cursorclass=DictCursor) as cursor:
         while True:
             means = tuple(take(the_means, 10000))
@@ -245,7 +245,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments]
             cursor.executemany(xref_query, means)
     return 0
 
-def insert_se(# pylint: disable = [too-many-arguments,too-many-locals]
+def insert_se(# pylint: disable = [too-many-arguments,too-many-locals, too-many-positional-arguments]
         filepath: str, speciesid: int, platformid: int, datasetid: int,
         dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument]
     "Insert the standard-error data into the database"
diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py
index e3577b6..fc029f9 100644
--- a/scripts/insert_samples.py
+++ b/scripts/insert_samples.py
@@ -3,11 +3,12 @@ import sys
 import logging
 import pathlib
 import argparse
+import traceback
 
 import MySQLdb as mdb
 from redis import Redis
+from gn_libs.mysqldb import database_connection
 
-from uploader.db_utils import database_connection
 from uploader.check_connections import check_db, check_redis
 from uploader.species.models import species_by_id
 from uploader.population.models import population_by_id
@@ -33,7 +34,7 @@ class SeparatorAction(argparse.Action):
         """Process the value passed in."""
         setattr(namespace, self.dest, (chr(9) if values == "\\t" else values))
 
-def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
+def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments, too-many-positional-arguments]
                    rconn: Redis,# pylint: disable=[unused-argument]
                    speciesid: int,
                    populationid: int,
@@ -73,6 +74,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
     print("Samples upload successfully completed.")
     return 0
 
+
 if __name__ == "__main__":
 
     def cli_args():
@@ -127,7 +129,7 @@ if __name__ == "__main__":
 
     def main():
         """Run script to insert samples into the database."""
-
+        status_code = 1 # Exit with an Exception
         args = cli_args()
         check_db(args.databaseuri)
         check_redis(args.redisuri)
@@ -137,13 +139,19 @@ if __name__ == "__main__":
 
         with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
               database_connection(args.databaseuri) as dbconn):
-            return insert_samples(dbconn,
-                                  rconn,
-                                  args.speciesid,
-                                  args.populationid,
-                                  args.samplesfile,
-                                  args.separator,
-                                  args.firstlineheading,
-                                  args.quotechar)
+
+            try:
+                status_code = insert_samples(dbconn,
+                                             rconn,
+                                             args.speciesid,
+                                             args.populationid,
+                                             args.samplesfile,
+                                             args.separator,
+                                             args.firstlineheading,
+                                             args.quotechar)
+            except Exception as _exc:# pylint: disable=[broad-exception-caught]
+                print(traceback.format_exc(), file=sys.stderr)
+
+        return status_code
 
     sys.exit(main())
diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py
new file mode 100644
index 0000000..9158307
--- /dev/null
+++ b/scripts/load_phenotypes_to_db.py
@@ -0,0 +1,526 @@
+"""Load phenotypes and their data provided in files into the database."""
+import sys
+import uuid
+import json
+import time
+import logging
+import argparse
+import datetime
+from typing import Any
+from pathlib import Path
+from zipfile import ZipFile
+from urllib.parse import urljoin
+from functools import reduce, partial
+
+from MySQLdb.cursors import DictCursor
+
+from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests
+
+from r_qtl import r_qtl2 as rqtl2
+from uploader.species.models import species_by_id
+from uploader.population.models import population_by_species_and_id
+from uploader.samples.models import samples_by_species_and_population
+from uploader.phenotypes.models import (
+    dataset_by_id,
+    save_phenotypes_data,
+    create_new_phenotypes,
+    quick_save_phenotypes_data)
+from uploader.publications.models import fetch_publication_by_id
+
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+from functional_tools import take
+
+logging.basicConfig(
+    format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
+logger = logging.getLogger(__name__)
+
+
+
+def __replace_na_strings__(line, na_strings):
+    return ((None if value in na_strings else value) for value in line)
+
+
+def save_phenotypes(
+        conn: mysqldb.Connection,
+        control_data: dict[str, Any],
+        population_id,
+        publication_id,
+        filesdir: Path
+) -> tuple[dict, ...]:
+    """Read `phenofiles` and save the phenotypes therein."""
+    phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"])
+    if len(phenofiles) <= 0:
+        return tuple()
+
+    if control_data["phenocovar_transposed"]:
+        logger.info("Undoing transposition of the files rows and columns.")
+        phenofiles = (
+            rqtl2.transpose_csv_with_rename(
+                _file,
+                build_line_splitter(control_data),
+                build_line_joiner(control_data))
+            for _file in phenofiles)
+
+    _headers = rqtl2.read_csv_file_headers(phenofiles[0],
+                                           control_data["phenocovar_transposed"],
+                                           control_data["sep"],
+                                           control_data["comment.char"])
+    return create_new_phenotypes(
+        conn,
+        population_id,
+        publication_id,
+        (dict(zip(_headers,
+                  __replace_na_strings__(line, control_data["na.strings"])))
+         for filecontent
+         in (rqtl2.read_csv_file(path,
+                                 separator=control_data["sep"],
+                                 comment_char=control_data["comment.char"])
+             for path in phenofiles)
+         for idx, line in enumerate(filecontent)
+         if idx != 0))
+
+
+def __row_to_dataitems__(
+        sample_row: dict,
+        dataidmap: dict,
+        pheno_name2id: dict[str, int],
+        samples: dict
+) -> tuple[dict, ...]:
+    samplename = sample_row["id"]
+
+    return ({
+        "phenotype_id": dataidmap[pheno_name2id[phenoname]]["phenotype_id"],
+        "data_id": dataidmap[pheno_name2id[phenoname]]["data_id"],
+        "sample_name": samplename,
+        "sample_id": samples[samplename]["Id"],
+        "value": phenovalue
+    } for phenoname, phenovalue in sample_row.items() if phenoname != "id")
+
+
+def __build_dataitems__(
+        phenofiles,
+        control_data,
+        samples,
+        dataidmap,
+        pheno_name2id
+):
+    _headers = rqtl2.read_csv_file_headers(
+        phenofiles[0],
+        False, # Any transposed files have been un-transposed by this point
+        control_data["sep"],
+        control_data["comment.char"])
+    _filescontents = (
+        rqtl2.read_csv_file(path,
+                            separator=control_data["sep"],
+                            comment_char=control_data["comment.char"])
+        for path in phenofiles)
+    _linescontents = (
+        __row_to_dataitems__(
+            dict(zip(("id",) + _headers[1:],
+                     __replace_na_strings__(line, control_data["na.strings"]))),
+            dataidmap,
+            pheno_name2id,
+            samples)
+        for linenum, line in (enumline for filecontent in _filescontents
+                              for enumline in enumerate(filecontent))
+        if linenum > 0)
+    return (item for items in _linescontents
+            for item in items
+            if item["value"] is not None)
+
+
+def save_numeric_data(# pylint: disable=[too-many-positional-arguments,too-many-arguments]
+        conn: mysqldb.Connection,
+        dataidmap: dict,
+        pheno_name2id: dict[str, int],
+        samples: tuple[dict, ...],
+        control_data: dict,
+        filesdir: Path,
+        filetype: str,
+        table: str
+):
+    """Read data from files and save to the database."""
+    phenofiles = tuple(
+        filesdir.joinpath(_file) for _file in control_data[filetype])
+    if len(phenofiles) <= 0:
+        return tuple()
+
+    if control_data[f"{filetype}_transposed"]:
+        logger.info("Undoing transposition of the files rows and columns.")
+        phenofiles = tuple(
+            rqtl2.transpose_csv_with_rename(
+                _file,
+                build_line_splitter(control_data),
+                build_line_joiner(control_data))
+            for _file in phenofiles)
+
+    try:
+        logger.debug("Attempt quick save with `LOAD … INFILE`.")
+        return quick_save_phenotypes_data(
+            conn,
+            table,
+            __build_dataitems__(
+                phenofiles,
+                control_data,
+                samples,
+                dataidmap,
+                pheno_name2id),
+            filesdir)
+    except Exception as _exc:# pylint: disable=[broad-exception-caught]
+        logger.debug("Could not use `LOAD … INFILE`, using raw query",
+                     exc_info=True)
+        time.sleep(60)
+        return save_phenotypes_data(
+            conn,
+            table,
+            __build_dataitems__(
+                phenofiles,
+                control_data,
+                samples,
+                dataidmap,
+                pheno_name2id))
+
+
+save_pheno_data = partial(save_numeric_data,
+                          filetype="pheno",
+                          table="PublishData")
+
+
+save_phenotypes_se = partial(save_numeric_data,
+                             filetype="phenose",
+                             table="PublishSE")
+
+
+save_phenotypes_n = partial(save_numeric_data,
+                             filetype="phenonum",
+                             table="NStrain")
+
+
+def update_auth(# pylint: disable=[too-many-locals,too-many-positional-arguments,too-many-arguments]
+        authserver,
+        token,
+        species,
+        population,
+        dataset,
+        xrefdata):
+    """Grant the user access to their data."""
+    _tries = 0
+    _delay = 1
+    headers = {
+        "Authorization": f"Bearer {token}",
+        "Content-Type": "application/json"
+    }
+    def authserveruri(endpoint):
+        return urljoin(authserver, endpoint)
+
+    def __fetch_user_details__():
+        logger.debug("… Fetching user details")
+        return mrequests.get(
+            authserveruri("/auth/user/"),
+            headers=headers
+        )
+
+    def __link_data__(user):
+        logger.debug("… linking uploaded data to user's group")
+        return mrequests.post(
+            authserveruri("/auth/data/link/phenotype"),
+            headers=headers,
+            json={
+                "species_name": species["Name"],
+                "group_id": user["group"]["group_id"],
+                "selected": [
+                    {
+                        "SpeciesId": species["SpeciesId"],
+                        "InbredSetId": population["Id"],
+                        "PublishFreezeId": dataset["Id"],
+                        "dataset_name": dataset["Name"],
+                        "dataset_fullname": dataset["FullName"],
+                        "dataset_shortname": dataset["ShortName"],
+                        "PublishXRefId": item["xref_id"]
+                    }
+                    for item in xrefdata
+                ],
+                "using-raw-ids": "on"
+            }).then(lambda ld_results: (user, ld_results))
+
+    def __fetch_phenotype_category_details__(user, linkeddata):
+        logger.debug("… fetching phenotype category details")
+        return mrequests.get(
+            authserveruri("/auth/resource/categories"),
+            headers=headers
+        ).then(
+            lambda categories: (
+                user,
+                linkeddata,
+                next(category for category in categories
+                     if category["resource_category_key"] == "phenotype"))
+        )
+
+    def __create_resource__(user, linkeddata, category):
+        logger.debug("… creating authorisation resource object")
+        now = datetime.datetime.now().isoformat()
+        return mrequests.post(
+            authserveruri("/auth/resource/create"),
+            headers=headers,
+            json={
+                "resource_category": category["resource_category_id"],
+                "resource_name": (f"{user['email']}—{dataset['Name']}—{now}—"
+                                  f"{len(xrefdata)} phenotypes"),
+                "public": "off"
+            }).then(lambda cr_results: (user, linkeddata, cr_results))
+
+    def __attach_data_to_resource__(user, linkeddata, resource):
+        logger.debug("… attaching data to authorisation resource object")
+        return mrequests.post(
+            authserveruri("/auth/resource/data/link"),
+            headers=headers,
+            json={
+                "dataset_type": "phenotype",
+                "resource_id": resource["resource_id"],
+                "data_link_ids": [
+                    item["data_link_id"] for item in linkeddata["traits"]]
+            }).then(lambda attc: (user, linkeddata, resource, attc))
+
+    def __handle_error__(resp):
+        error = resp.json()
+        if error.get("error") == "IntegrityError":
+            # This is hacky. If the auth already exists, something went wrong
+            # somewhere.
+            # This needs investigation to recover correctly.
+            logger.info(
+                "The authorisation for the data was already set up.")
+            return 0
+        logger.error("ERROR: Updating the authorisation for the data failed.")
+        logger.debug(
+            "ERROR: The response from the authorisation server was:\n\t%s",
+            error)
+        return 1
+
+    def __handle_success__(_val):
+        logger.info(
+            "The authorisation for the data has been updated successfully.")
+        return 0
+
+    return __fetch_user_details__().then(__link_data__).then(
+        lambda result: __fetch_phenotype_category_details__(*result)
+    ).then(
+        lambda result: __create_resource__(*result)
+    ).then(
+        lambda result: __attach_data_to_resource__(*result)
+    ).either(__handle_error__, __handle_success__)
+
+
+def load_data(conn: mysqldb.Connection, job: dict) -> int:#pylint: disable=[too-many-locals]
+    """Load the data attached in the given job."""
+    _job_metadata = job["metadata"]
+    # Steps
+    # 0. Read data from the files: can be multiple files per type
+    #
+    _species = species_by_id(conn, int(_job_metadata["species_id"]))
+    _population = population_by_species_and_id(
+        conn,
+        _species["SpeciesId"],
+        int(_job_metadata["population_id"]))
+    _dataset = dataset_by_id(
+        conn,
+        _species["SpeciesId"],
+        _population["Id"],
+        int(_job_metadata["dataset_id"]))
+    # 1. Just retrive the publication: Don't create publications for now.
+    _publication = fetch_publication_by_id(
+        conn, int(_job_metadata.get("publication_id", "0"))) or {"Id": 0}
+    # 2. Save all new phenotypes:
+    #     -> return phenotype IDs
+    bundle = Path(_job_metadata["bundle_file"])
+    _control_data = rqtl2.control_data(bundle)
+    logger.info("Extracting the zipped bundle of files.")
+    _outdir = Path(bundle.parent, f"bundle_{bundle.stem}")
+    with ZipFile(str(bundle), "r") as zfile:
+        _files = rqtl2.extract(zfile, _outdir)
+    logger.info("Saving new phenotypes.")
+    _phenos = save_phenotypes(conn,
+                              _control_data,
+                              _population["Id"],
+                              _publication["Id"],
+                              _outdir)
+
+    def __build_phenos_maps__(accumulator, row):
+        return ({
+            **accumulator[0],
+            row["phenotype_id"]: {
+                "population_id": _population["Id"],
+                "phenotype_id": row["phenotype_id"],
+                "data_id": row["data_id"],
+                "publication_id": row["publication_id"],
+            }
+        }, {
+            **accumulator[1],
+            row["pre_publication_abbreviation"]: row["phenotype_id"]
+        }, (
+            accumulator[2] + ({
+                "xref_id": row["xref_id"],
+                "population_id": row["population_id"],
+                "phenotype_id": row["phenotype_id"],
+                "publication_id": row["publication_id"],
+                "data_id": row["data_id"]
+            },)))
+    dataidmap, pheno_name2id, _xrefs = reduce(__build_phenos_maps__,
+                                      _phenos,
+                                      ({},{}, tuple()))
+    # 3. a. Fetch the strain names and IDS: create name->ID map
+    samples = {
+        row["Name"]: row
+        for row in samples_by_species_and_population(
+                conn, _species["SpeciesId"], _population["Id"])}
+    #    b. Save all the data items (DataIds are vibes), return new IDs
+    logger.info("Saving new phenotypes data.")
+    _num_data_rows = save_pheno_data(conn=conn,
+                                     dataidmap=dataidmap,
+                                     pheno_name2id=pheno_name2id,
+                                     samples=samples,
+                                     control_data=_control_data,
+                                     filesdir=_outdir)
+    logger.info("Saved %s new phenotype data rows.", _num_data_rows)
+
+    # 4. If standard errors and N exist, save them too
+    #    (use IDs returned in `3. b.` above).
+    if _control_data.get("phenose"):
+        logger.info("Saving new phenotypes standard errors.")
+        _num_se_rows = save_phenotypes_se(conn=conn,
+                                          dataidmap=dataidmap,
+                                          pheno_name2id=pheno_name2id,
+                                          samples=samples,
+                                          control_data=_control_data,
+                                          filesdir=_outdir)
+        logger.info("Saved %s new phenotype standard error rows.", _num_se_rows)
+
+    if _control_data.get("phenonum"):
+        logger.info("Saving new phenotypes sample counts.")
+        _num_n_rows = save_phenotypes_n(conn=conn,
+                                        dataidmap=dataidmap,
+                                        pheno_name2id=pheno_name2id,
+                                        samples=samples,
+                                        control_data=_control_data,
+                                        filesdir=_outdir)
+        logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows)
+
+    return (_species, _population, _dataset, _xrefs)
+
+
+def update_means(
+        conn: mysqldb.Connection,
+        population_id: int,
+        xref_ids: tuple[int, ...]
+):
+    """Compute the means from the data and update them in the database."""
+    query = (
+        "UPDATE PublishXRef SET mean = "
+        "(SELECT AVG(value) FROM PublishData"
+        " WHERE PublishData.Id=PublishXRef.DataId) "
+        "WHERE PublishXRef.Id=%(xref_id)s "
+        "AND PublishXRef.InbredSetId=%(population_id)s")
+    _xref_iterator = (_xref_id for _xref_id in xref_ids)
+    with conn.cursor(cursorclass=DictCursor) as cursor:
+        while True:
+            batch = take(_xref_iterator, 10000)
+            if len(batch) == 0:
+                break
+            cursor.executemany(
+                query,
+                tuple({
+                    "population_id": population_id,
+                    "xref_id": _xref_id
+                } for _xref_id in batch))
+
+
+if __name__ == "__main__":
+    def parse_args():
+        """Setup command-line arguments."""
+        parser = argparse.ArgumentParser(
+            prog="load_phenotypes_to_db",
+            description="Process the phenotypes' data and load it into the database.")
+        parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
+        parser.add_argument(
+            "jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
+        parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
+        parser.add_argument(
+            "--log-level",
+            type=str,
+            help="Determines what is logged out.",
+            choices=("debug", "info", "warning", "error", "critical"),
+            default="info")
+        return parser.parse_args()
+
+    def setup_logging(log_level: str):
+        """Setup logging for the script."""
+        logger.setLevel(log_level)
+        logging.getLogger("uploader.phenotypes.models").setLevel(log_level)
+
+
+    def main():
+        """Entry-point for this script."""
+        args = parse_args()
+        setup_logging(args.log_level.upper())
+
+        with (mysqldb.database_connection(args.db_uri) as conn,
+              conn.cursor(cursorclass=DictCursor) as cursor,
+              sqlite3.connection(args.jobs_db_path) as jobs_conn):
+            job = jobs.job(jobs_conn, args.job_id)
+
+            # Lock the PublishXRef/PublishData/PublishSE/NStrain here: Why?
+            #     The `DataId` values are sequential, but not auto-increment
+            #     Can't convert `PublishXRef`.`DataId` to AUTO_INCREMENT.
+            #     `SELECT MAX(DataId) FROM PublishXRef;`
+            #     How do you check for a table lock?
+            #     https://oracle-base.com/articles/mysql/mysql-identify-locked-tables
+            #     `SHOW OPEN TABLES LIKE 'Publish%';`
+            _db_tables_ = (
+                "Species",
+                "InbredSet",
+                "Strain",
+                "StrainXRef",
+                "Publication",
+                "Phenotype",
+                "PublishXRef",
+                "PublishFreeze",
+                "PublishData",
+                "PublishSE",
+                "NStrain")
+
+            logger.debug(
+                ("Locking database tables for the connection:" +
+                 "".join("\n\t- %s" for _ in _db_tables_) + "\n"),
+                *_db_tables_)
+            cursor.execute(# Lock the tables to avoid race conditions
+                "LOCK TABLES " + ", ".join(
+                    f"{_table} WRITE" for _table in _db_tables_))
+
+            db_results = load_data(conn, job)
+            _xref_ids = tuple(xref["xref_id"] for xref in db_results[3])
+            jobs.update_metadata(
+                jobs_conn,
+                args.job_id,
+                "xref_ids",
+                json.dumps(_xref_ids))
+
+            logger.info("Unlocking all database tables.")
+            cursor.execute("UNLOCK TABLES")
+
+            logger.info("Updating means.")
+            update_means(conn, db_results[1]["Id"], _xref_ids)
+
+        # Update authorisations (break this down) — maybe loop until it works?
+        logger.info("Updating authorisation.")
+        _job_metadata = job["metadata"]
+        return update_auth(_job_metadata["authserver"],
+                           _job_metadata["token"],
+                           *db_results)
+
+
+    try:
+        sys.exit(main())
+    except Exception as _exc:# pylint: disable=[broad-exception-caught]
+        logger.debug("Data loading failed… Halting!",
+                     exc_info=True)
+        sys.exit(1)
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 20cfd3b..e2ce420 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -2,6 +2,7 @@
 import sys
 import uuid
 import json
+import argparse
 import traceback
 from typing import Any
 from pathlib import Path
@@ -10,6 +11,7 @@ from logging import Logger, getLogger, StreamHandler
 
 import MySQLdb as mdb
 from redis import Redis
+from gn_libs.mysqldb import database_connection
 
 from functional_tools import take
 
@@ -18,7 +20,6 @@ import r_qtl.r_qtl2_qc as rqc
 import r_qtl.exceptions as rqe
 
 from uploader import jobs
-from uploader.db_utils import database_connection
 from uploader.check_connections import check_db, check_redis
 
 from scripts.cli_parser import init_cli_parser
@@ -93,14 +94,17 @@ def process_bundle(dbconn: mdb.Connection,
         if has_geno_file(thejob):
             logger.info("Processing geno files.")
             genoexit = install_genotypes(
+                rconn,
                 dbconn,
-                meta["speciesid"],
-                meta["populationid"],
-                meta["geno-dataset-id"],
-                Path(meta["rqtl2-bundle-file"]),
+                f"{rprefix}:{jobid}",
+                argparse.Namespace(
+                    speciesid=meta["speciesid"],
+                    populationid=meta["populationid"],
+                    datasetid=meta["geno-dataset-id"],
+                    rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
                 logger)
             if genoexit != 0:
-                raise Exception("Processing 'geno' file failed.")
+                raise Exception("Processing 'geno' file failed.")# pylint: disable=[broad-exception-raised]
             logger.debug(
                 "geno file processing completed successfully. (ExitCode: %s)",
                 genoexit)
@@ -108,14 +112,17 @@ def process_bundle(dbconn: mdb.Connection,
 
         if has_pheno_file(thejob):
             phenoexit = install_pheno_files(
+                rconn,
                 dbconn,
-                meta["speciesid"],
-                meta["platformid"],
-                meta["probe-dataset-id"],
-                Path(meta["rqtl2-bundle-file"]),
+                f"{rprefix}:{jobid}",
+                argparse.Namespace(
+                    speciesid=meta["speciesid"],
+                    platformid=meta["platformid"],
+                    dataset_id=meta["probe-dataset-id"],
+                    rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
                 logger)
             if phenoexit != 0:
-                raise Exception("Processing 'pheno' file failed.")
+                raise Exception("Processing 'pheno' file failed.")# pylint: disable=[broad-exception-raised]
             logger.debug(
                 "pheno file processing completed successfully. (ExitCode: %s)",
                 phenoexit)
diff --git a/scripts/qc.py b/scripts/qc.py
index 6de051f..b00f4c1 100644
--- a/scripts/qc.py
+++ b/scripts/qc.py
@@ -5,14 +5,14 @@ import mimetypes
 from typing import Union, Callable
 from argparse import ArgumentParser
 
+from gn_libs.mysqldb import database_connection
+
 from functional_tools import take
 
 from quality_control.utils import make_progress_calculator
 from quality_control.errors import InvalidValue, DuplicateHeading
 from quality_control.parsing import FileType, strain_names, collect_errors
 
-from uploader.db_utils import database_connection
-
 from .cli_parser import init_cli_parser
 
 
diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py
index fc95d13..0207938 100644
--- a/scripts/qc_on_rqtl2_bundle.py
+++ b/scripts/qc_on_rqtl2_bundle.py
@@ -12,12 +12,12 @@ from typing import Union, Sequence, Callable, Iterator
 
 import MySQLdb as mdb
 from redis import Redis
+from gn_libs.mysqldb import database_connection
 
 from quality_control.errors import InvalidValue
 from quality_control.checks import decimal_points_error
 
 from uploader import jobs
-from uploader.db_utils import database_connection
 from uploader.check_connections import check_db, check_redis
 
 from r_qtl import r_qtl2 as rqtl2
@@ -191,7 +191,7 @@ def check_pheno_samples(
     return allerrors
 
 
-def qc_pheno_errors(# pylint: disable=[too-many-arguments]
+def qc_pheno_errors(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
         rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool:
     """Check for errors in `pheno` file(s)."""
     cdata = rqtl2.control_data(zfile)
@@ -260,7 +260,7 @@ def run_qc(rconn: Redis,
         if qc_missing_files(rconn, fqjobid, zfile, logger):
             return 1
 
-    def with_zipfile(# pylint: disable=[too-many-arguments]
+    def with_zipfile(# pylint: disable=[too-many-arguments,too-many-positional-arguments]
             rconn, fqjobid, dbconn, speciesid, filename, logger, func
     ):
         with ZipFile(filename, "r") as zfile:
diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py
index 2ae682b..a74e5e4 100644
--- a/scripts/redis_logger.py
+++ b/scripts/redis_logger.py
@@ -1,11 +1,12 @@
 """Utilities to log to redis for our worker scripts."""
 import logging
+from typing import Optional
 
 from redis import Redis
 
 class RedisLogger(logging.Handler):
     """Log out to redis for our worker scripts"""
-    def __init__(self,#pylint: disable=[too-many-arguments]
+    def __init__(self,#pylint: disable=[too-many-arguments, too-many-positional-arguments]
                  rconn: Redis,
                  fullyqualifiedjobid: str,
                  messageslistname: str,
@@ -26,6 +27,26 @@ class RedisLogger(logging.Handler):
         self.redisconnection.rpush(self.messageslistname, self.format(record))
         self.redisconnection.expire(self.messageslistname, self.expiry)
 
+class RedisMessageListHandler(logging.Handler):
+    """Send messages to specified redis list."""
+    def __init__(self,
+                 rconn: Redis,
+                 fullyqualifiedkey: str,
+                 loglevel: int = logging.NOTSET,
+                 expiry: Optional[int] = 86400):
+        super().__init__(loglevel)
+        self.redisconnection = rconn
+        self.fullyqualifiedkey = fullyqualifiedkey
+        self.expiry = expiry
+
+    def emit(self, record):
+        """Log out to specified `fullyqualifiedkey`."""
+        self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record))
+        if bool(self.expiry):
+            self.redisconnection.expire(self.fullyqualifiedkey, self.expiry)
+        else:
+            self.redisconnection.persist(self.fullyqualifiedkey)
+
 def setup_redis_logger(rconn: Redis,
                        fullyqualifiedjobid: str,
                        job_messagelist: str,
diff --git a/scripts/rqtl2/bundleutils.py b/scripts/rqtl2/bundleutils.py
new file mode 100644
index 0000000..17faa7c
--- /dev/null
+++ b/scripts/rqtl2/bundleutils.py
@@ -0,0 +1,44 @@
+"""Common utilities to operate in R/qtl2 bundles."""
+from typing import Union, Callable
+
+def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]:
+    """Build and return a function to use to split data in the files.
+
+    Parameters
+    ----------
+    cdata: A dict holding the control information included with the R/qtl2
+        bundle.
+
+    Returns
+    -------
+    A function that takes a string and return a tuple of strings.
+    """
+    separator = cdata["sep"]
+    na_strings = cdata["na.strings"]
+    def __splitter__(line: str) -> tuple[Union[str, None], ...]:
+        return tuple(
+            item if item not in na_strings else None
+            for item in
+            (field.strip() for field in line.strip().split(separator)))
+    return __splitter__
+
+
+def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]:
+    """Build and return a function to use to split data in the files.
+
+    Parameters
+    ----------
+    cdata: A dict holding the control information included with the R/qtl2
+        bundle.
+
+    Returns
+    -------
+    A function that takes a string and return a tuple of strings.
+    """
+    separator = cdata["sep"]
+    na_strings = cdata["na.strings"]
+    def __joiner__(row: tuple[Union[str, None], ...]) -> str:
+        return separator.join(
+            (na_strings[0] if item is None else item)
+            for item in row)
+    return __joiner__
diff --git a/scripts/rqtl2/cli_parser.py b/scripts/rqtl2/cli_parser.py
index bcc7a4f..9bb60a3 100644
--- a/scripts/rqtl2/cli_parser.py
+++ b/scripts/rqtl2/cli_parser.py
@@ -2,12 +2,22 @@
 from pathlib import Path
 from argparse import ArgumentParser
 
-def add_common_arguments(parser: ArgumentParser) -> ArgumentParser:
-    """Add common arguments to the CLI parser."""
-    parser.add_argument("datasetid",
-                        type=int,
-                        help="The dataset to which the data belongs.")
+def add_bundle_argument(parser: ArgumentParser) -> ArgumentParser:
+    """Add the `rqtl2bundle` argument."""
     parser.add_argument("rqtl2bundle",
                         type=Path,
                         help="Path to R/qtl2 bundle zip file.")
     return parser
+
+
+def add_datasetid_argument(parser: ArgumentParser) -> ArgumentParser:
+    """Add the `datasetid` argument."""
+    parser.add_argument("datasetid",
+                        type=int,
+                        help="The dataset to which the data belongs.")
+    return parser
+
+
+def add_common_arguments(parser: ArgumentParser) -> ArgumentParser:
+    """Add common arguments to the CLI parser."""
+    return add_bundle_argument(add_datasetid_argument(parser))
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index b7fb68e..e0e00e7 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -1,38 +1,58 @@
 """Build common script-entry structure."""
-from logging import Logger
+import sys
+import logging
 from typing import Callable
 from argparse import Namespace
+from logging import StreamHandler
 
 from redis import Redis
 from MySQLdb import Connection
+from gn_libs.mysqldb import database_connection
 
 from uploader import jobs
-from uploader.db_utils import database_connection
 from uploader.check_connections import check_db, check_redis
 
 from scripts.redis_logger import setup_redis_logger
 
-def build_main(args: Namespace,
-               run_fn: Callable[[Connection, Namespace], int],
-               logger: Logger,
-               loglevel: str = "INFO") -> Callable[[],int]:
+def build_main(
+        args: Namespace,
+        run_fn: Callable[
+            [Redis, Connection, str, Namespace, logging.Logger],
+            int
+        ],
+        logger: logging.Logger
+) -> Callable[[],int]:
     """Build a function to be used as an entry-point for scripts."""
     def main():
-        check_db(args.databaseuri)
-        check_redis(args.redisuri)
-        if not args.rqtl2bundle.exists():
-            logger.error("File not found: '%s'.", args.rqtl2bundle)
-            return 2
-
         with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
               database_connection(args.databaseuri) as dbconn):
-            fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid)
-            logger.addHandler(setup_redis_logger(
-                rconn,
-                fqjobid,
-                f"{fqjobid}:log-messages",
-                args.redisexpiry))
-            logger.setLevel(loglevel)
-            return run_fn(dbconn, args)
+            logger.setLevel(args.loglevel.upper())
+            fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+
+            try:
+                rconn.hset(fqjobid, "status", "started")
+                logger.addHandler(setup_redis_logger(
+                    rconn,
+                    fqjobid,
+                    f"{fqjobid}:log-messages",
+                    args.redisexpiry))
+                logger.addHandler(StreamHandler(stream=sys.stderr))
+
+                check_db(args.databaseuri)
+                check_redis(args.redisuri)
+                if not args.rqtl2bundle.exists():
+                    logger.error("File not found: '%s'.", args.rqtl2bundle)
+                    return 2
+
+                returncode = run_fn(rconn, dbconn, fqjobid, args)
+                if returncode == 0:
+                    rconn.hset(fqjobid, "status", "completed:success")
+                    return returncode
+                rconn.hset(fqjobid, "status", "completed:error")
+                return returncode
+            except Exception as _exc:# pylint: disable=[broad-except]
+                logger.error("The process failed!", exc_info=True)
+                rconn.hset(fqjobid, "status", "completed:error")
+                return 4
 
     return main
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index 6b89142..8762655 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -1,12 +1,13 @@
 """Load genotypes from R/qtl2 bundle into the database."""
 import sys
+import argparse
 import traceback
-from pathlib import Path
 from zipfile import ZipFile
 from functools import reduce
 from typing import Iterator, Optional
-from logging import Logger, getLogger, StreamHandler
+from logging import Logger, getLogger
 
+from redis import Redis
 import MySQLdb as mdb
 from MySQLdb.cursors import DictCursor
 
@@ -19,6 +20,8 @@ from scripts.rqtl2.entry import build_main
 from scripts.rqtl2.cli_parser import add_common_arguments
 from scripts.cli_parser import init_cli_parser, add_global_data_arguments
 
+__MODULE__ = "scripts.rqtl2.install_genotypes"
+
 def insert_markers(
         dbconn: mdb.Connection,
         speciesid: int,
@@ -183,15 +186,16 @@ def cross_reference_genotypes(
         cursor.executemany(insertquery, insertparams)
         return cursor.rowcount
 
-def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_genotypes(#pylint: disable=[too-many-locals]
+        rconn: Redis,#pylint: disable=[unused-argument]
         dbconn: mdb.Connection,
-        speciesid: int,
-        populationid: int,
-        datasetid: int,
-        rqtl2bundle: Path,
+        fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
+        args: argparse.Namespace,
         logger: Logger = getLogger(__name__)
 ) -> int:
     """Load any existing genotypes into the database."""
+    (speciesid, populationid, datasetid, rqtl2bundle) = (
+         args.speciesid, args.populationid, args.datasetid, args.rqtl2bundle)
     count = 0
     with ZipFile(str(rqtl2bundle.absolute()), "r") as zfile:
         try:
@@ -253,15 +257,5 @@ if __name__ == "__main__":
 
         return parser.parse_args()
 
-    thelogger = getLogger("install_genotypes")
-    thelogger.addHandler(StreamHandler(stream=sys.stderr))
-    main = build_main(
-        cli_args(),
-        lambda dbconn, args: install_genotypes(dbconn,
-                                               args.speciesid,
-                                               args.populationid,
-                                               args.datasetid,
-                                               args.rqtl2bundle),
-        thelogger,
-        "INFO")
+    main = build_main(cli_args(), install_genotypes, __MODULE__)
     sys.exit(main())
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index b5cab8e..9059cd6 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -1,11 +1,12 @@
 """Load pheno from R/qtl2 bundle into the database."""
 import sys
+import argparse
 import traceback
-from pathlib import Path
 from zipfile import ZipFile
 from functools import reduce
-from logging import Logger, getLogger, StreamHandler
+from logging import Logger, getLogger
 
+from redis import Redis
 import MySQLdb as mdb
 from MySQLdb.cursors import DictCursor
 
@@ -18,6 +19,8 @@ from r_qtl import r_qtl2_qc as rqc
 
 from functional_tools import take
 
+__MODULE__ = "scripts.rqtl2.install_phenos"
+
 def insert_probesets(dbconn: mdb.Connection,
                      platformid: int,
                      phenos: tuple[str, ...]) -> int:
@@ -93,14 +96,15 @@ def cross_reference_probeset_data(dbconn: mdb.Connection,
             } for row in dataids))
         return cursor.rowcount
 
-def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_pheno_files(#pylint: disable=[too-many-locals]
+        rconn: Redis,#pylint: disable=[unused-argument]
         dbconn: mdb.Connection,
-        speciesid: int,
-        platformid: int,
-        datasetid: int,
-        rqtl2bundle: Path,
+        fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
+        args: argparse.Namespace,
         logger: Logger = getLogger()) -> int:
     """Load data in `pheno` files and other related files into the database."""
+    (speciesid, platformid, datasetid, rqtl2bundle) = (
+        args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle)
     with ZipFile(str(rqtl2bundle), "r") as zfile:
         try:
             rqc.validate_bundle(zfile)
@@ -155,16 +159,5 @@ if __name__ == "__main__":
 
         return parser.parse_args()
 
-    thelogger = getLogger("install_phenos")
-    thelogger.addHandler(StreamHandler(stream=sys.stderr))
-    main = build_main(
-        cli_args(),
-        lambda dbconn, args: install_pheno_files(dbconn,
-                                                 args.speciesid,
-                                                 args.platformid,
-                                                 args.datasetid,
-                                                 args.rqtl2bundle,
-                                                 thelogger),
-        thelogger,
-        "DEBUG")
+    main = build_main(cli_args(), install_pheno_files, __MODULE__)
     sys.exit(main())
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
new file mode 100644
index 0000000..9f11f57
--- /dev/null
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -0,0 +1,519 @@
+"""Run quality control on phenotypes-specific files in the bundle."""
+import sys
+import uuid
+import json
+import shutil
+import logging
+import tempfile
+import contextlib
+from pathlib import Path
+from logging import Logger
+from zipfile import ZipFile
+from argparse import Namespace
+import multiprocessing as mproc
+from functools import reduce, partial
+from typing import Union, Iterator, Callable, Optional, Sequence
+
+import MySQLdb as mdb
+from redis import Redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl.fileerrors import InvalidValue
+
+from functional_tools import chain
+
+from quality_control.checks import decimal_places_pattern
+
+from uploader.files import sha256_digest_over_file
+from uploader.samples.models import samples_by_species_and_population
+
+from scripts.rqtl2.entry import build_main
+from scripts.redis_logger import RedisMessageListHandler
+from scripts.rqtl2.cli_parser import add_bundle_argument
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+logging.basicConfig(
+    format=("%(asctime)s - %(levelname)s %(name)s: "
+            "(%(pathname)s: %(lineno)d) %(message)s"))
+logger = logging.getLogger(__MODULE__)
+
+def validate(
+        phenobundle: Path,
+        logger: Logger# pylint: disable=[redefined-outer-name]
+) -> dict:
+    """Check that the bundle is generally valid"""
+    try:
+        rqc.validate_bundle(phenobundle)
+    except rqe.RQTLError as rqtlerr:
+        # logger.error("Bundle file validation failed!", exc_info=True)
+        return {
+            "skip": True,
+            "logger": logger,
+            "phenobundle": phenobundle,
+            "errors": (" ".join(rqtlerr.args),)
+        }
+    return {
+        "errors": tuple(),
+        "skip": False,
+        "phenobundle": phenobundle,
+        "logger": logger
+    }
+
+
+def check_for_mandatory_pheno_keys(
+        phenobundle: Path,
+        logger: Logger,# pylint: disable=[redefined-outer-name]
+        **kwargs
+) -> dict:
+    """Check that the mandatory keys exist for phenotypes."""
+    if kwargs.get("skip", False):
+        return {
+            **kwargs,
+            "logger": logger,
+            "phenobundle": phenobundle
+        }
+
+    _mandatory_keys = ("pheno", "phenocovar")
+    _cdata = rqtl2.read_control_file(phenobundle)
+    _errors = kwargs.get("errors", tuple()) + tuple(
+        f"Expected '{key}' file(s) are not declared in the bundle."
+        for key in _mandatory_keys if key not in _cdata.keys())
+    return {
+        **kwargs,
+        "logger": logger,
+        "phenobundle": phenobundle,
+        "errors": _errors,
+        "skip": len(_errors) > 0
+    }
+
+
+def check_for_averages_files(
+        phenobundle: Path,
+        logger: Logger,# pylint: disable=[redefined-outer-name]
+        **kwargs
+) -> dict:
+    """Check that averages files appear together"""
+    if kwargs.get("skip", False):
+        return {
+            **kwargs,
+            "logger": logger,
+            "phenobundle": phenobundle
+        }
+
+    _together = (("phenose", "phenonum"), ("phenonum", "phenose"))
+    _cdata = rqtl2.read_control_file(phenobundle)
+    _errors = kwargs.get("errors", tuple()) + tuple(
+        f"'{first}' is defined in the control file but there is no "
+        f"corresponding '{second}'"
+        for first, second in _together
+        if ((first in _cdata.keys()) and (second not in _cdata.keys())))
+    return {
+        **kwargs,
+        "logger": logger,
+        "phenobundle": phenobundle,
+        "errors": _errors,
+        "skip": len(_errors) > 0
+    }
+
+
+def extract_bundle(
+        bundle: Path, workdir: Path, jobid: uuid.UUID
+) -> tuple[Path, tuple[Path, ...]]:
+    """Extract the bundle."""
+    with ZipFile(bundle) as zfile:
+        extractiondir = workdir.joinpath(
+            f"{str(jobid)}-{sha256_digest_over_file(bundle)}-{bundle.name}")
+        return extractiondir, rqtl2.extract(zfile, extractiondir)
+
+
+def undo_transpose(filetype: str, cdata: dict, extractiondir):
+    """Undo transposition of all files of type `filetype` in thebundle."""
+    if len(cdata.get(filetype, [])) > 0 and cdata.get(f"{filetype}_transposed", False):
+        files = (extractiondir.joinpath(_file) for _file in cdata[filetype])
+        for _file in files:
+            rqtl2.transpose_csv_with_rename(
+                _file,
+                build_line_splitter(cdata),
+                build_line_joiner(cdata))
+
+
+@contextlib.contextmanager
+def redis_logger(
+        redisuri: str, loggername: str, filename: str, fqkey: str
+) -> Iterator[logging.Logger]:
+    """Build a Redis message-list logger."""
+    rconn = Redis.from_url(redisuri, decode_responses=True)
+    _logger = logging.getLogger(loggername)
+    _logger.propagate = False
+    handler = RedisMessageListHandler(
+        rconn,
+        fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
+    handler.setFormatter(logging.getLogger().handlers[0].formatter)
+    _logger.addHandler(handler)
+    try:
+        yield _logger
+    finally:
+        rconn.close()
+
+
+def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue:
+    """Persist the error in redis."""
+    rconn.rpush(fqkey, json.dumps(error._asdict()))
+    return error
+
+
+def file_fqkey(prefix: str, section: str, filepath: Path) -> str:
+    """Build a files fully-qualified key in a consistent manner"""
+    return f"{prefix}:{section}:{filepath.name}"
+
+
+def qc_phenocovar_file(
+        filepath: Path,
+        redisuri,
+        fqkey: str,
+        separator: str,
+        comment_char: str):
+    """Check that `phenocovar` files are structured correctly."""
+    with (redis_logger(
+            redisuri,
+            f"{__MODULE__}.qc_phenocovar_file",
+            filepath.name,
+            f"{fqkey}:logs") as _logger,
+          Redis.from_url(redisuri, decode_responses=True) as rconn):
+        print("Running QC on file: ", filepath.name)
+        _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+        _headings = tuple(heading.lower() for heading in next(_csvfile))
+        _errors: tuple[InvalidValue, ...] = tuple()
+        save_error = partial(
+            push_error, rconn, file_fqkey(fqkey, "errors", filepath))
+        for heading in ("description", "units"):
+            if heading not in _headings:
+                _errors = (save_error(InvalidValue(
+                    filepath.name,
+                    "header row",
+                    "-",
+                    "-",
+                    (f"File {filepath.name} is missing the {heading} heading "
+                     "in the header line."))),)
+
+        def collect_errors(errors_and_linecount, line):
+            _errs, _lc = errors_and_linecount
+            _logger.info("Testing record '%s'", line[0])
+            if len(line) != len(_headings):
+                _errs = _errs + (save_error(InvalidValue(
+                    filepath.name,
+                    line[0],
+                    "-",
+                    "-",
+                    (f"Record {_lc} in file {filepath.name} has a different "
+                     "number of columns than the number of headings"))),)
+            _line = dict(zip(_headings, line))
+            if not bool(_line.get("description")):
+                _errs = _errs + (
+                    save_error(InvalidValue(filepath.name,
+                                            _line[_headings[0]],
+                                            "description",
+                                            _line.get("description"),
+                                            "The description is not provided!")),)
+
+            rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                       mapping={
+                           "status": "checking",
+                           "linecount": _lc+1,
+                           "total-errors": len(_errs)
+                       })
+            return _errs, _lc+1
+
+        _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+        rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                   mapping={
+                       "status": "completed",
+                       "linecount": _linecount,
+                       "total-errors": len(_errors)
+                   })
+        return {filepath.name: {"errors": _errors, "linecount": _linecount}}
+
+
+def merge_dicts(*dicts):
+    """Merge multiple dicts into a single one."""
+    return reduce(lambda merged, dct: {**merged, **dct}, dicts, {})
+
+
+def decimal_points_error(# pylint: disable=[too-many-arguments,too-many-positional-arguments]
+        filename: str,
+        rowtitle: str,
+        coltitle: str,
+        cellvalue: str,
+        message: str,
+        decimal_places: int = 1
+) -> Optional[InvalidValue]:
+    """Returns an error if the value does not meet the checks."""
+    if not bool(decimal_places_pattern(decimal_places).match(cellvalue)):
+        return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+    return None
+
+
+def integer_error(
+        filename: str,
+        rowtitle: str,
+        coltitle: str,
+        cellvalue: str,
+        message: str
+) -> Optional[InvalidValue]:
+    """Returns an error if the value does not meet the checks."""
+    try:
+        value = int(cellvalue)
+        if value <= 0:
+            raise ValueError("Must be a non-zero, positive number.")
+        return None
+    except ValueError as _verr:
+        return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+
+
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments]
+        filepath: Path,
+        redisuri: str,
+        fqkey: str,
+        samples: tuple[str, ...],
+        phenonames: tuple[str, ...],
+        separator: str,
+        comment_char: str,
+        na_strings: Sequence[str],
+        error_fn: Callable = decimal_points_error
+):
+    """Run QC/QA on a `pheno` file."""
+    with (redis_logger(
+            redisuri,
+            f"{__MODULE__}.qc_pheno_file",
+            filepath.name,
+            f"{fqkey}:logs") as _logger,
+          Redis.from_url(redisuri, decode_responses=True) as rconn):
+        print("Running QC on file: ", filepath.name)
+        save_error = partial(
+            push_error, rconn, file_fqkey(fqkey, "errors", filepath))
+        _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+        _headings: tuple[str, ...] = tuple(
+            # select lowercase for comparison purposes
+            heading.lower() for heading in next(_csvfile))
+        _errors: tuple[InvalidValue, ...] = tuple()
+
+        _absent = tuple(pheno for pheno in _headings[1:] if pheno
+                        not in tuple(
+                            # lower to have consistent case with headings for
+                            # comparison
+                            phe.lower() for phe in phenonames))
+        if len(_absent) > 0:
+            _errors = _errors + (save_error(InvalidValue(
+                filepath.name,
+                "header row",
+                "-",
+                ", ".join(_absent),
+                ("The following phenotype names do not exist in any of the "
+                 f"provided phenocovar files: ({', '.join(_absent)})"))),)
+
+        def collect_errors(errors_and_linecount, line):
+            _errs, _lc = errors_and_linecount
+            _logger.debug("Checking row %s", line[0])
+            if line[0] not in samples:
+                _errs = _errs + (save_error(InvalidValue(
+                filepath.name,
+                line[0],
+                _headings[0],
+                line[0],
+                (f"The sample named '{line[0]}' does not exist in the database. "
+                 "You will need to upload that first."))),)
+
+            for field, value in zip(_headings[1:], line[1:]):
+                if value in na_strings:
+                    continue
+                _err = error_fn(
+                    filepath.name,
+                    line[0],
+                    field,
+                    value)
+                _errs = _errs + ((save_error(_err),) if bool(_err) else tuple())
+
+            rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                       mapping={
+                           "status": "checking",
+                           "linecount": _lc+1,
+                           "total-errors": len(_errs)
+                       })
+            return _errs, _lc+1
+
+        _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+        rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                   mapping={
+                       "status": "completed",
+                       "linecount": _linecount,
+                       "total-errors": len(_errors)
+                   })
+        return {filepath.name: {"errors": _errors, "linecount": _linecount}}
+
+
+def phenotype_names(filepath: Path,
+                    separator: str,
+                    comment_char: str) -> tuple[str, ...]:
+    """Read phenotype names from `phenocovar` file."""
+    return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value]
+                  rqtl2.read_csv_file(filepath, separator, comment_char),
+                  tuple())[1:]
+
+def fullyqualifiedkey(
+        prefix: str,
+        rest: Optional[str] = None
+) -> Union[Callable[[str], str], str]:
+    """Compute fully qualified Redis key."""
+    if not bool(rest):
+        return lambda _rest: f"{prefix}:{_rest}"
+    return f"{prefix}:{rest}"
+
+def run_qc(# pylint: disable=[too-many-locals]
+        rconn: Redis,
+        dbconn: mdb.Connection,
+        fullyqualifiedjobid: str,
+        args: Namespace
+) -> int:
+    """Run quality control checks on the bundle."""
+    print("Beginning the quality assurance checks.")
+    results = check_for_averages_files(
+        **check_for_mandatory_pheno_keys(
+            **validate(args.rqtl2bundle, logger)))
+    errors = results.get("errors", tuple())
+    if len(errors) > 0:
+        logger.error("We found the following errors:\n%s",
+                     "\n".join(f" - {error}" for error in errors))
+        return 1
+    # Run QC on actual values
+    #       Steps:
+    #       - Extract file to specific directory
+    extractiondir, *_bundlefiles = extract_bundle(
+        args.rqtl2bundle, args.workingdir, args.jobid)
+
+    #       - For every pheno, phenocovar, phenose, phenonum file, undo
+    #         transposition where relevant
+    cdata = rqtl2.control_data(extractiondir)
+    with mproc.Pool(mproc.cpu_count() - 1) as pool:
+        pool.starmap(
+            undo_transpose,
+            ((ftype, cdata, extractiondir)
+             for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
+
+    #       - Fetch samples/individuals from database.
+    print("Fetching samples/individuals from the database.")
+    samples = tuple(#type: ignore[var-annotated]
+        item for item in set(reduce(
+            lambda acc, item: acc + (
+                item["Name"], item["Name2"], item["Symbol"], item["Alias"]),
+            samples_by_species_and_population(
+                dbconn, args.speciesid, args.populationid),
+            tuple()))
+        if bool(item))
+
+    #       - Check that `description` and `units` is present in phenocovar for
+    #         all phenotypes
+    rconn.hset(fullyqualifiedjobid,
+               "fully-qualified-keys:phenocovar",
+               json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
+                                for _file in cdata.get("phenocovar", []))))
+    with mproc.Pool(mproc.cpu_count() - 1) as pool:
+        print("Check for errors in 'phenocovar' file(s).")
+        _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
+            (extractiondir.joinpath(_file),
+             args.redisuri,
+             f"{fullyqualifiedjobid}:phenocovar",
+             cdata["sep"],
+             cdata["comment.char"])
+            for _file in cdata.get("phenocovar", []))))
+
+        #       - Check all samples in pheno files exist in database
+        #       - Check all phenotypes in pheno files exist in phenocovar files
+        #       - Check all numeric values in pheno files
+        phenonames = tuple(set(
+            name for names in pool.starmap(phenotype_names, tuple(
+            (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"])
+            for _file in cdata.get("phenocovar", [])))
+            for name in names))
+
+        dec_err_fn = partial(decimal_points_error, message=(
+            "Expected a non-negative number with at least one decimal "
+            "place."))
+
+        print("Check for errors in 'pheno' file(s).")
+        _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+            extractiondir.joinpath(_file),
+            args.redisuri,
+            chain(
+                "pheno",
+                fullyqualifiedkey(args.jobid),
+                fullyqualifiedkey(args.redisprefix)),
+            samples,
+            phenonames,
+            cdata["sep"],
+            cdata["comment.char"],
+            cdata["na.strings"],
+            dec_err_fn
+        ) for _file in cdata.get("pheno", []))))
+
+        #       - Check the 3 checks above for phenose and phenonum values too
+        # qc_phenose_files(…)
+        # qc_phenonum_files(…)
+        print("Check for errors in 'phenose' file(s).")
+        _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+            extractiondir.joinpath(_file),
+            args.redisuri,
+            chain(
+                "phenose",
+                fullyqualifiedkey(args.jobid),
+                fullyqualifiedkey(args.redisprefix)),
+            samples,
+            phenonames,
+            cdata["sep"],
+            cdata["comment.char"],
+            cdata["na.strings"],
+            dec_err_fn
+        ) for _file in cdata.get("phenose", []))))
+
+        print("Check for errors in 'phenonum' file(s).")
+        _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+            extractiondir.joinpath(_file),
+            args.redisuri,
+            chain(
+                "phenonum",
+                fullyqualifiedkey(args.jobid),
+                fullyqualifiedkey(args.redisprefix)),
+            samples,
+            phenonames,
+            cdata["sep"],
+            cdata["comment.char"],
+            cdata["na.strings"],
+            partial(integer_error, message=(
+                "Expected a non-negative, non-zero integer value."))
+        ) for _file in cdata.get("phenonum", []))))
+
+    #       - Delete all extracted files
+    shutil.rmtree(extractiondir)
+    return 0
+
+
+if __name__ == "__main__":
+    def cli_args():
+        """Process command-line arguments for `install_phenos`"""
+        parser = add_bundle_argument(add_global_data_arguments(init_cli_parser(
+            program="PhenotypesQC",
+            description=(
+                "Perform Quality Control checks on a phenotypes bundle file"))))
+        parser.add_argument(
+            "--workingdir",
+            default=f"{tempfile.gettempdir()}/phenotypes_qc",
+            help=("The directory where this script will put its intermediate "
+                  "files."),
+            type=Path)
+        return parser.parse_args()
+
+    main = build_main(cli_args(), run_qc, logger)
+    sys.exit(main())
diff --git a/scripts/validate_file.py b/scripts/validate_file.py
index a40d7e7..52e55ec 100644
--- a/scripts/validate_file.py
+++ b/scripts/validate_file.py
@@ -8,12 +8,12 @@ from zipfile import ZipFile, is_zipfile
 import jsonpickle
 from redis import Redis
 from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin]
+from gn_libs.mysqldb import database_connection
 
 from quality_control.utils import make_progress_calculator
 from quality_control.parsing import FileType, strain_names, collect_errors
 
 from uploader import jobs
-from uploader.db_utils import database_connection
 
 from .cli_parser import init_cli_parser
 from .qc import add_file_validation_arguments
diff --git a/scripts/worker.py b/scripts/worker.py
index 91b0332..3165fe7 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -79,7 +79,7 @@ def main():
         fqjobid = jobs.job_key(args.redisprefix, args.jobid)
         rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
         rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id),
-                     time=timedelta(seconds=(2 * 60 * 60)))
+                     time=timedelta(seconds=2 * 60 * 60))
         print(f"No such job. '{args.job_id}'.", file=sys.stderr)
         return 2
     return 3