aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/cli_parser.py7
-rw-r--r--scripts/insert_data.py4
-rw-r--r--scripts/insert_samples.py35
-rw-r--r--scripts/load_phenotypes_to_db.py521
-rw-r--r--scripts/phenotypes_bulk_edit.py266
-rw-r--r--scripts/process_rqtl2_bundle.py31
-rw-r--r--scripts/qc.py4
-rw-r--r--scripts/qc_on_rqtl2_bundle.py87
-rw-r--r--scripts/qcapp_wsgi.py52
-rw-r--r--scripts/redis_logger.py21
-rw-r--r--scripts/rqtl2/bundleutils.py44
-rw-r--r--scripts/rqtl2/cli_parser.py20
-rw-r--r--scripts/rqtl2/entry.py64
-rw-r--r--scripts/rqtl2/install_genotypes.py150
-rw-r--r--scripts/rqtl2/install_phenos.py31
-rw-r--r--scripts/rqtl2/phenotypes_qc.py516
-rw-r--r--scripts/validate_file.py4
-rw-r--r--scripts/worker.py4
18 files changed, 1674 insertions, 187 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index 308ee4b..0c91c5e 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -19,6 +19,13 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
type=int,
default=86400,
help="How long to keep any redis keys around.")
+ parser.add_argument(
+ "--loglevel",
+ type=str,
+ default="INFO",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL",
+ "debug", "info", "warning", "error", "critical"],
+ help="The severity of events to track with the logger.")
return parser
def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser:
diff --git a/scripts/insert_data.py b/scripts/insert_data.py
index 1465348..67038f8 100644
--- a/scripts/insert_data.py
+++ b/scripts/insert_data.py
@@ -10,12 +10,12 @@ from typing import Tuple, Iterator
import MySQLdb as mdb
from redis import Redis
from MySQLdb.cursors import DictCursor
+from gn_libs.mysqldb import database_connection
from functional_tools import take
from quality_control.file_utils import open_file
-from qc_app.db_utils import database_connection
-from qc_app.check_connections import check_db, check_redis
+from uploader.check_connections import check_db, check_redis
# Set up logging
stderr_handler = logging.StreamHandler(stream=sys.stderr)
diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py
index 8431462..742c4ae 100644
--- a/scripts/insert_samples.py
+++ b/scripts/insert_samples.py
@@ -3,14 +3,16 @@ import sys
import logging
import pathlib
import argparse
+import traceback
import MySQLdb as mdb
from redis import Redis
+from gn_libs.mysqldb import database_connection
-from qc_app.db_utils import database_connection
-from qc_app.check_connections import check_db, check_redis
-from qc_app.db import species_by_id, population_by_id
-from qc_app.samples import (
+from uploader.check_connections import check_db, check_redis
+from uploader.species.models import species_by_id
+from uploader.population.models import population_by_id
+from uploader.samples.models import (
save_samples_data,
read_samples_file,
cross_reference_samples)
@@ -72,6 +74,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
print("Samples upload successfully completed.")
return 0
+
if __name__ == "__main__":
def cli_args():
@@ -126,7 +129,7 @@ if __name__ == "__main__":
def main():
"""Run script to insert samples into the database."""
-
+ status_code = 1 # Exit with an Exception
args = cli_args()
check_db(args.databaseuri)
check_redis(args.redisuri)
@@ -136,13 +139,19 @@ if __name__ == "__main__":
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
- return insert_samples(dbconn,
- rconn,
- args.speciesid,
- args.populationid,
- args.samplesfile,
- args.separator,
- args.firstlineheading,
- args.quotechar)
+
+ try:
+ status_code = insert_samples(dbconn,
+ rconn,
+ args.speciesid,
+ args.populationid,
+ args.samplesfile,
+ args.separator,
+ args.firstlineheading,
+ args.quotechar)
+ except Exception as _exc:
+ print(traceback.format_exc(), file=sys.stderr)
+
+ return status_code
sys.exit(main())
diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py
new file mode 100644
index 0000000..08ee558
--- /dev/null
+++ b/scripts/load_phenotypes_to_db.py
@@ -0,0 +1,521 @@
+import sys
+import uuid
+import json
+import logging
+import argparse
+import datetime
+from pathlib import Path
+from zipfile import ZipFile
+from typing import Any, Union
+from urllib.parse import urljoin
+from functools import reduce, partial
+
+from MySQLdb.cursors import Cursor, DictCursor
+
+from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests
+
+from r_qtl import r_qtl2 as rqtl2
+from uploader.species.models import species_by_id
+from uploader.population.models import population_by_species_and_id
+from uploader.samples.models import samples_by_species_and_population
+from uploader.phenotypes.models import (
+ dataset_by_id,
+ save_phenotypes_data,
+ create_new_phenotypes,
+ quick_save_phenotypes_data)
+from uploader.publications.models import (
+ create_new_publications,
+ fetch_publication_by_id)
+
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+logging.basicConfig(
+ format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
+logger = logging.getLogger(__name__)
+
+
+
+def __replace_na_strings__(line, na_strings):
+ return ((None if value in na_strings else value) for value in line)
+
+
+def save_phenotypes(
+ cursor: mysqldb.Connection,
+ control_data: dict[str, Any],
+ filesdir: Path
+) -> tuple[dict, ...]:
+ """Read `phenofiles` and save the phenotypes therein."""
+ ## TODO: Replace with something like this: ##
+ # phenofiles = control_data["phenocovar"] + control_data.get(
+ # "gn-metadata", {}).get("pheno", [])
+ #
+ # This is meant to load (and merge) data from the "phenocovar" and
+ # "gn-metadata -> pheno" files into a single collection of phenotypes.
+ phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"])
+ if len(phenofiles) <= 0:
+ return tuple()
+
+ if control_data["phenocovar_transposed"]:
+ logger.info("Undoing transposition of the files rows and columns.")
+ phenofiles = (
+ rqtl2.transpose_csv_with_rename(
+ _file,
+ build_line_splitter(control_data),
+ build_line_joiner(control_data))
+ for _file in phenofiles)
+
+ _headers = rqtl2.read_csv_file_headers(phenofiles[0],
+ control_data["phenocovar_transposed"],
+ control_data["sep"],
+ control_data["comment.char"])
+ return create_new_phenotypes(
+ cursor,
+ (dict(zip(_headers,
+ __replace_na_strings__(line, control_data["na.strings"])))
+ for filecontent
+ in (rqtl2.read_csv_file(path,
+ separator=control_data["sep"],
+ comment_char=control_data["comment.char"])
+ for path in phenofiles)
+ for idx, line in enumerate(filecontent)
+ if idx != 0))
+
+
+def __fetch_next_dataid__(conn: mysqldb.Connection) -> int:
+ """Fetch the next available DataId value from the database."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT MAX(DataId) AS CurrentMaxDataId FROM PublishXRef")
+ return int(cursor.fetchone()["CurrentMaxDataId"]) + 1
+
+
+def __row_to_dataitems__(
+ sample_row: dict,
+ dataidmap: dict,
+ pheno_name2id: dict[str, int],
+ samples: dict
+) -> tuple[dict, ...]:
+ samplename = sample_row["id"]
+
+ return ({
+ "phenotype_id": dataidmap[pheno_name2id[phenoname]]["phenotype_id"],
+ "data_id": dataidmap[pheno_name2id[phenoname]]["data_id"],
+ "sample_name": samplename,
+ "sample_id": samples[samplename]["Id"],
+ "value": phenovalue
+ } for phenoname, phenovalue in sample_row.items() if phenoname != "id")
+
+
+def __build_dataitems__(
+ filetype,
+ phenofiles,
+ control_data,
+ samples,
+ dataidmap,
+ pheno_name2id
+):
+ _headers = rqtl2.read_csv_file_headers(
+ phenofiles[0],
+ False, # Any transposed files have been un-transposed by this point
+ control_data["sep"],
+ control_data["comment.char"])
+ _filescontents = (
+ rqtl2.read_csv_file(path,
+ separator=control_data["sep"],
+ comment_char=control_data["comment.char"])
+ for path in phenofiles)
+ _linescontents = (
+ __row_to_dataitems__(
+ dict(zip(("id",) + _headers[1:],
+ __replace_na_strings__(line, control_data["na.strings"]))),
+ dataidmap,
+ pheno_name2id,
+ samples)
+ for linenum, line in (enumline for filecontent in _filescontents
+ for enumline in enumerate(filecontent))
+ if linenum > 0)
+ return (item for items in _linescontents
+ for item in items
+ if item["value"] is not None)
+
+
+def save_numeric_data(
+ conn: mysqldb.Connection,
+ dataidmap: dict,
+ pheno_name2id: dict[str, int],
+ samples: tuple[dict, ...],
+ control_data: dict,
+ filesdir: Path,
+ filetype: str,
+ table: str
+):
+ """Read data from files and save to the database."""
+ phenofiles = tuple(
+ filesdir.joinpath(_file) for _file in control_data[filetype])
+ if len(phenofiles) <= 0:
+ return tuple()
+
+ if control_data[f"{filetype}_transposed"]:
+ logger.info("Undoing transposition of the files rows and columns.")
+ phenofiles = tuple(
+ rqtl2.transpose_csv_with_rename(
+ _file,
+ build_line_splitter(control_data),
+ build_line_joiner(control_data))
+ for _file in phenofiles)
+
+ try:
+ logger.debug("Attempt quick save with `LOAD … INFILE`.")
+ return quick_save_phenotypes_data(
+ conn,
+ table,
+ __build_dataitems__(
+ filetype,
+ phenofiles,
+ control_data,
+ samples,
+ dataidmap,
+ pheno_name2id),
+ filesdir)
+ except Exception as _exc:
+ logger.debug("Could not use `LOAD … INFILE`, using raw query",
+ exc_info=True)
+ import time;time.sleep(60)
+ return save_phenotypes_data(
+ conn,
+ table,
+ __build_dataitems__(
+ filetype,
+ phenofiles,
+ control_data,
+ samples,
+ dataidmap,
+ pheno_name2id))
+
+
+save_pheno_data = partial(save_numeric_data,
+ filetype="pheno",
+ table="PublishData")
+
+
+save_phenotypes_se = partial(save_numeric_data,
+ filetype="phenose",
+ table="PublishSE")
+
+
+save_phenotypes_n = partial(save_numeric_data,
+ filetype="phenonum",
+ table="NStrain")
+
+
+def cross_reference_phenotypes_publications_and_data(
+ conn: mysqldb.Connection, xref_data: tuple[dict, ...]
+):
+ """Crossreference the phenotypes, publication and data."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute("SELECT MAX(Id) CurrentMaxId FROM PublishXRef")
+ _nextid = int(cursor.fetchone()["CurrentMaxId"]) + 1
+ _params = tuple({**row, "xref_id": _id}
+ for _id, row in enumerate(xref_data, start=_nextid))
+ cursor.executemany(
+ ("INSERT INTO PublishXRef("
+ "Id, InbredSetId, PhenotypeId, PublicationId, DataId, comments"
+ ") "
+ "VALUES ("
+ "%(xref_id)s, %(population_id)s, %(phenotype_id)s, "
+ "%(publication_id)s, %(data_id)s, 'Upload of new data.'"
+ ")"),
+ _params)
+ return _params
+ return tuple()
+
+
+def update_auth(authserver, token, species, population, dataset, xrefdata):
+ """Grant the user access to their data."""
+ # TODO Call into the auth server to:
+ # 1. Link the phenotypes with a user group
+ # - fetch group: http://localhost:8081/auth/user/group
+ # - link data to group: http://localhost:8081/auth/data/link/phenotype
+ # - *might need code update in gn-auth: remove restriction, perhaps*
+ # 2. Create resource (perhaps?)
+ # - Get resource categories: http://localhost:8081/auth/resource/categories
+ # - Create a new resource: http://localhost:80host:8081/auth/resource/create
+ # - single resource for all phenotypes
+ # - resource name from user, species, population, dataset, datetime?
+ # - User will have "ownership" of resource by default
+ # 3. Link data to the resource: http://localhost:8081/auth/resource/data/link
+ # - Update code to allow linking multiple items in a single request
+ _tries = 0 # TODO use this to limit how many tries before quiting and bailing
+ _delay = 1
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json"
+ }
+ def authserveruri(endpoint):
+ return urljoin(authserver, endpoint)
+
+ def __fetch_user_details__():
+ logger.debug("… Fetching user details")
+ return mrequests.get(
+ authserveruri("/auth/user/"),
+ headers=headers
+ )
+
+ def __link_data__(user):
+ logger.debug("… linking uploaded data to user's group")
+ return mrequests.post(
+ authserveruri("/auth/data/link/phenotype"),
+ headers=headers,
+ json={
+ "species_name": species["Name"],
+ "group_id": user["group"]["group_id"],
+ "selected": [
+ {
+ "SpeciesId": species["SpeciesId"],
+ "InbredSetId": population["Id"],
+ "PublishFreezeId": dataset["Id"],
+ "dataset_name": dataset["Name"],
+ "dataset_fullname": dataset["FullName"],
+ "dataset_shortname": dataset["ShortName"],
+ "PublishXRefId": item["xref_id"]
+ }
+ for item in xrefdata
+ ],
+ "using-raw-ids": "on"
+ }).then(lambda ld_results: (user, ld_results))
+
+ def __fetch_phenotype_category_details__(user, linkeddata):
+ logger.debug("… fetching phenotype category details")
+ return mrequests.get(
+ authserveruri("/auth/resource/categories"),
+ headers=headers
+ ).then(
+ lambda categories: (
+ user,
+ linkeddata,
+ next(category for category in categories
+ if category["resource_category_key"] == "phenotype"))
+ )
+
+ def __create_resource__(user, linkeddata, category):
+ logger.debug("… creating authorisation resource object")
+ now = datetime.datetime.now().isoformat()
+ return mrequests.post(
+ authserveruri("/auth/resource/create"),
+ headers=headers,
+ json={
+ "resource_category": category["resource_category_id"],
+ "resource_name": (f"{user['email']}—{dataset['Name']}—{now}—"
+ f"{len(xrefdata)} phenotypes"),
+ "public": "off"
+ }).then(lambda cr_results: (user, linkeddata, cr_results))
+
+ def __attach_data_to_resource__(user, linkeddata, resource):
+ logger.debug("… attaching data to authorisation resource object")
+ return mrequests.post(
+ authserveruri("/auth/resource/data/link"),
+ headers=headers,
+ json={
+ "dataset_type": "phenotype",
+ "resource_id": resource["resource_id"],
+ "data_link_ids": [
+ item["data_link_id"] for item in linkeddata["traits"]]
+ }).then(lambda attc: (user, linkeddata, resource, attc))
+
+ def __handle_error__(resp):
+ logger.error("ERROR: Updating the authorisation for the data failed.")
+ logger.debug(
+ "ERROR: The response from the authorisation server was:\n\t%s",
+ resp.json())
+ return 1
+
+ def __handle_success__(val):
+ logger.info(
+ "The authorisation for the data has been updated successfully.")
+ return 0
+
+ return __fetch_user_details__().then(__link_data__).then(
+ lambda result: __fetch_phenotype_category_details__(*result)
+ ).then(
+ lambda result: __create_resource__(*result)
+ ).then(
+ lambda result: __attach_data_to_resource__(*result)
+ ).either(__handle_error__, __handle_success__)
+
+
+def load_data(conn: mysqldb.Connection, job: dict) -> int:
+ """Load the data attached in the given job."""
+ _job_metadata = job["metadata"]
+ # Steps
+ # 0. Read data from the files: can be multiple files per type
+ #
+ _species = species_by_id(conn, int(_job_metadata["species_id"]))
+ _population = population_by_species_and_id(
+ conn,
+ _species["SpeciesId"],
+ int(_job_metadata["population_id"]))
+ _dataset = dataset_by_id(
+ conn,
+ _species["SpeciesId"],
+ _population["Id"],
+ int(_job_metadata["dataset_id"]))
+ # 1. Just retrive the publication: Don't create publications for now.
+ _publication = fetch_publication_by_id(
+ conn, int(_job_metadata.get("publication_id", "0"))) or {"Id": 0}
+ # 2. Save all new phenotypes:
+ # -> return phenotype IDs
+ bundle = Path(_job_metadata["bundle_file"])
+ _control_data = rqtl2.control_data(bundle)
+ logger.info("Extracting the zipped bundle of files.")
+ _outdir = Path(bundle.parent, f"bundle_{bundle.stem}")
+ with ZipFile(str(bundle), "r") as zfile:
+ _files = rqtl2.extract(zfile, _outdir)
+ logger.info("Saving new phenotypes.")
+ _phenos = save_phenotypes(conn, _control_data, _outdir)
+ def __build_phenos_maps__(accumulator, current):
+ dataid, row = current
+ return ({
+ **accumulator[0],
+ row["phenotype_id"]: {
+ "population_id": _population["Id"],
+ "phenotype_id": row["phenotype_id"],
+ "data_id": dataid,
+ "publication_id": _publication["Id"],
+ }
+ }, {
+ **accumulator[1],
+ row["id"]: row["phenotype_id"]
+ })
+ dataidmap, pheno_name2id = reduce(
+ __build_phenos_maps__,
+ enumerate(_phenos, start=__fetch_next_dataid__(conn)),
+ ({},{}))
+ # 3. a. Fetch the strain names and IDS: create name->ID map
+ samples = {
+ row["Name"]: row
+ for row in samples_by_species_and_population(
+ conn, _species["SpeciesId"], _population["Id"])}
+ # b. Save all the data items (DataIds are vibes), return new IDs
+ logger.info("Saving new phenotypes data.")
+ _num_data_rows = save_pheno_data(conn=conn,
+ dataidmap=dataidmap,
+ pheno_name2id=pheno_name2id,
+ samples=samples,
+ control_data=_control_data,
+ filesdir=_outdir)
+ logger.info("Saved %s new phenotype data rows.", _num_data_rows)
+ # 4. Cross-reference Phenotype, Publication, and PublishData in PublishXRef
+ logger.info("Cross-referencing new phenotypes to their data and publications.")
+ _xrefs = cross_reference_phenotypes_publications_and_data(
+ conn, tuple(dataidmap.values()))
+ # 5. If standard errors and N exist, save them too
+ # (use IDs returned in `3. b.` above).
+ if _control_data.get("phenose"):
+ logger.info("Saving new phenotypes standard errors.")
+ _num_se_rows = save_phenotypes_se(conn=conn,
+ dataidmap=dataidmap,
+ pheno_name2id=pheno_name2id,
+ samples=samples,
+ control_data=_control_data,
+ filesdir=_outdir)
+ logger.info("Saved %s new phenotype standard error rows.", _num_se_rows)
+
+ if _control_data.get("phenonum"):
+ logger.info("Saving new phenotypes sample counts.")
+ _num_n_rows = save_phenotypes_n(conn=conn,
+ dataidmap=dataidmap,
+ pheno_name2id=pheno_name2id,
+ samples=samples,
+ control_data=_control_data,
+ filesdir=_outdir)
+ logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows)
+
+ return (_species, _population, _dataset, _xrefs)
+
+
+if __name__ == "__main__":
+ def parse_args():
+ """Setup command-line arguments."""
+ parser = argparse.ArgumentParser(
+ prog="load_phenotypes_to_db",
+ description="Process the phenotypes' data and load it into the database.")
+ parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
+ parser.add_argument(
+ "jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
+ parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
+ return parser.parse_args()
+
+ def setup_logging(log_level: str):
+ """Setup logging for the script."""
+ logger.setLevel(log_level)
+ logging.getLogger("uploader.phenotypes.models").setLevel(log_level)
+
+
+ def main():
+ """Entry-point for this script."""
+ args = parse_args()
+ setup_logging(args.log_level.upper())
+
+ with (mysqldb.database_connection(args.db_uri) as conn,
+ conn.cursor(cursorclass=DictCursor) as cursor,
+ sqlite3.connection(args.jobs_db_path) as jobs_conn):
+ job = jobs.job(jobs_conn, args.job_id)
+
+ # Lock the PublishXRef/PublishData/PublishSE/NStrain here: Why?
+ # The `DataId` values are sequential, but not auto-increment
+ # Can't convert `PublishXRef`.`DataId` to AUTO_INCREMENT.
+ # `SELECT MAX(DataId) FROM PublishXRef;`
+ # How do you check for a table lock?
+ # https://oracle-base.com/articles/mysql/mysql-identify-locked-tables
+ # `SHOW OPEN TABLES LIKE 'Publish%';`
+ _db_tables_ = (
+ "Species",
+ "InbredSet",
+ "Strain",
+ "StrainXRef",
+ "Publication",
+ "Phenotype",
+ "PublishXRef",
+ "PublishFreeze",
+ "PublishData",
+ "PublishSE",
+ "NStrain")
+
+ logger.debug(
+ ("Locking database tables for the connection:" +
+ "".join("\n\t- %s" for _ in _db_tables_) + "\n"),
+ *_db_tables_)
+ cursor.execute(# Lock the tables to avoid race conditions
+ "LOCK TABLES " + ", ".join(
+ f"{_table} WRITE" for _table in _db_tables_))
+
+ db_results = load_data(conn, job)
+ jobs.update_metadata(
+ jobs_conn,
+ args.job_id,
+ "xref_ids",
+ json.dumps([xref["xref_id"] for xref in db_results[3]]))
+
+ logger.info("Unlocking all database tables.")
+ cursor.execute("UNLOCK TABLES")
+
+ # Update authorisations (break this down) — maybe loop until it works?
+ logger.info("Updating authorisation.")
+ _job_metadata = job["metadata"]
+ return update_auth(_job_metadata["authserver"],
+ _job_metadata["token"],
+ *db_results)
+
+
+ try:
+ sys.exit(main())
+ except Exception as _exc:
+ logger.debug("Data loading failed… Halting!",
+ exc_info=True)
+ sys.exit(1)
diff --git a/scripts/phenotypes_bulk_edit.py b/scripts/phenotypes_bulk_edit.py
new file mode 100644
index 0000000..cee5f4e
--- /dev/null
+++ b/scripts/phenotypes_bulk_edit.py
@@ -0,0 +1,266 @@
+import sys
+import uuid
+import logging
+import argparse
+from pathlib import Path
+from typing import Iterator
+from functools import reduce
+
+from MySQLdb.cursors import DictCursor
+
+from gn_libs import jobs, mysqldb, sqlite3
+
+from uploader.phenotypes.models import phenotypes_data_by_ids
+from uploader.phenotypes.misc import phenotypes_data_differences
+from uploader.phenotypes.views import BULK_EDIT_COMMON_FIELDNAMES
+
+import uploader.publications.pubmed as pmed
+from uploader.publications.misc import publications_differences
+from uploader.publications.models import (
+ update_publications, fetch_phenotype_publications)
+
+logging.basicConfig(
+ format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
+logger = logging.getLogger(__name__)
+
+
+def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool:
+ """Verify that all the `UniqueIdentifier` values are valid."""
+ logger.info("Checking the 'UniqueIdentifier' values.")
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ paramstr = ",".join(["(%s, %s)"] * len(ids))
+ cursor.execute(
+ "SELECT PhenotypeId AS phenotype_id, Id AS xref_id "
+ "FROM PublishXRef "
+ f"WHERE (PhenotypeId, Id) IN ({paramstr})",
+ tuple(item for row in ids for item in row))
+ mysqldb.debug_query(cursor, logger)
+ found = tuple((row["phenotype_id"], row["xref_id"])
+ for row in cursor.fetchall())
+
+ not_found = tuple(item for item in ids if item not in found)
+ if len(not_found) == 0:
+ logger.info("All 'UniqueIdentifier' are valid.")
+ return True
+
+ for item in not_found:
+ logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1])
+
+ return False
+
+
+def check_for_mandatory_fields():
+ """Verify that mandatory fields have values."""
+ pass
+
+
+def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]:
+ """Fetch basic (non-numeric) phenotypes data from the database."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ paramstr = ",".join(["%s"] * len(ids))
+ cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) "
+ "ORDER BY Id ASC",
+ ids)
+ return tuple(dict(row) for row in cursor.fetchall())
+
+
+def descriptions_differences(file_data, db_data) -> dict[str, str]:
+ """Compute differences in the descriptions."""
+ logger.info("Computing differences in phenotype descriptions.")
+ assert len(file_data) == len(db_data), "The counts of phenotypes differ!"
+ description_columns = ("Pre_publication_description",
+ "Post_publication_description",
+ "Original_description",
+ "Pre_publication_abbreviation",
+ "Post_publication_abbreviation")
+ diff = tuple()
+ for file_row, db_row in zip(file_data, db_data):
+ assert file_row["phenotype_id"] == db_row["Id"]
+ inner_diff = {
+ key: file_row[key]
+ for key in description_columns
+ if not file_row[key] == db_row[key]
+ }
+ if bool(inner_diff):
+ diff = diff + ({
+ "phenotype_id": file_row["phenotype_id"],
+ **inner_diff
+ },)
+
+ return diff
+
+
+def update_descriptions():
+ """Update descriptions in the database"""
+ logger.info("Updating descriptions")
+ # Compute differences between db data and uploaded file
+ # Only run query for changed descriptions
+ pass
+
+
+def link_publications():
+ """Link phenotypes to relevant publications."""
+ logger.info("Linking phenotypes to publications.")
+ # Create publication if PubMed_ID doesn't exist in db
+ pass
+
+
+def update_values():
+ """Update the phenotype values."""
+ logger.info("Updating phenotypes values.")
+ # Compute differences between db data and uploaded file
+ # Only run query for changed data
+ pass
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ prog="Phenotypes Bulk-Edit Processor",
+ description="Process the bulk-edits to phenotype data and descriptions.")
+ parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
+ parser.add_argument(
+ "jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
+ parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
+ return parser.parse_args()
+
+
+def read_file(filepath: Path) -> Iterator[str]:
+ """Read the file, one line at a time."""
+ with filepath.open(mode="r", encoding="utf-8") as infile:
+ count = 0
+ headers = None
+ for line in infile:
+ if line.startswith("#"): # ignore comments
+ continue;
+
+ fields = line.strip().split("\t")
+ if count == 0:
+ headers = fields
+ count = count + 1
+ continue
+
+ _dict = dict(zip(
+ headers,
+ ((None if item.strip() == "" else item.strip())
+ for item in fields)))
+ _pheno, _xref = _dict.pop("UniqueIdentifier").split("::")
+ _dict = {
+ key: ((float(val) if bool(val) else val)
+ if key not in BULK_EDIT_COMMON_FIELDNAMES
+ else val)
+ for key, val in _dict.items()
+ }
+ _dict["phenotype_id"] = int(_pheno.split(":")[1])
+ _dict["xref_id"] = int(_xref.split(":")[1])
+ if _dict["PubMed_ID"] is not None:
+ _dict["PubMed_ID"] = int(_dict["PubMed_ID"])
+
+ yield _dict
+ count = count + 1
+
+
+def run(conn, job):
+ """Process the data and update it."""
+ file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])),
+ key=lambda item: item["phenotype_id"]))
+ pheno_ids, pheno_xref_ids, pubmed_ids = reduce(
+ lambda coll, curr: (
+ coll[0] + (curr["phenotype_id"],),
+ coll[1] + ((curr["phenotype_id"], curr["xref_id"]),),
+ coll[2].union(set([curr["PubMed_ID"]]))),
+ file_contents,
+ (tuple(), tuple(), set([None])))
+ check_ids(conn, pheno_xref_ids)
+ check_for_mandatory_fields()
+ # stop running here if any errors are found.
+
+ ### Compute differences
+ logger.info("Computing differences.")
+ # 1. Basic Phenotype data differences
+ # a. Descriptions differences
+ _desc_diff = descriptions_differences(
+ file_contents, __fetch_phenotypes__(conn, pheno_ids))
+ logger.debug("DESCRIPTIONS DIFFERENCES: %s", _desc_diff)
+
+ # b. Publications differences
+ _db_publications = fetch_phenotype_publications(conn, pheno_xref_ids)
+ logger.debug("DB PUBLICATIONS: %s", _db_publications)
+
+ _pubmed_map = {
+ (int(row["PubMed_ID"]) if bool(row["PubMed_ID"]) else None): f"{row['phenotype_id']}::{row['xref_id']}"
+ for row in file_contents
+ }
+ _pub_id_map = {
+ f"{pub['PhenotypeId']}::{pub['xref_id']}": pub["PublicationId"]
+ for pub in _db_publications
+ }
+
+ _new_publications = update_publications(
+ conn, tuple({
+ **pub, "publication_id": _pub_id_map[_pubmed_map[pub["pubmed_id"]]]
+ } for pub in pmed.fetch_publications(tuple(
+ pubmed_id for pubmed_id in pubmed_ids
+ if pubmed_id not in
+ tuple(row["PubMed_ID"] for row in _db_publications)))))
+ _pub_diff = publications_differences(
+ file_contents, _db_publications, {
+ row["PubMed_ID" if "PubMed_ID" in row else "pubmed_id"]: row[
+ "PublicationId" if "PublicationId" in row else "publication_id"]
+ for row in _db_publications + _new_publications})
+ logger.debug("Publications diff: %s", _pub_diff)
+ # 2. Data differences
+ _db_pheno_data = phenotypes_data_by_ids(conn, tuple({
+ "population_id": job["metadata"]["population-id"],
+ "phenoid": row[0],
+ "xref_id": row[1]
+ } for row in pheno_xref_ids))
+
+ data_diff = phenotypes_data_differences(
+ ({
+ "phenotype_id": row["phenotype_id"],
+ "xref_id": row["xref_id"],
+ "data": {
+ key:val for key,val in row.items()
+ if key not in BULK_EDIT_COMMON_FIELDNAMES + [
+ "phenotype_id", "xref_id"]
+ }
+ } for row in file_contents),
+ ({
+ **row,
+ "PhenotypeId": row["Id"],
+ "data": {
+ dataitem["StrainName"]: dataitem
+ for dataitem in row["data"].values()
+ }
+ } for row in _db_pheno_data))
+ logger.debug("Data differences: %s", data_diff)
+ ### END: Compute differences
+ update_descriptions()
+ link_publications()
+ update_values()
+ return 0
+
+
+def main():
+ """Entry-point for this script."""
+ args = parse_args()
+ logger.setLevel(args.log_level.upper())
+ logger.debug("Arguments: %s", args)
+
+ logging.getLogger("uploader.phenotypes.misc").setLevel(args.log_level.upper())
+ logging.getLogger("uploader.phenotypes.models").setLevel(args.log_level.upper())
+ logging.getLogger("uploader.publications.models").setLevel(args.log_level.upper())
+
+ with (mysqldb.database_connection(args.db_uri) as conn,
+ sqlite3.connection(args.jobs_db_path) as jobs_conn):
+ return run(conn, jobs.job(jobs_conn, args.job_id))
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 4da3936..8b7a0fb 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -2,6 +2,7 @@
import sys
import uuid
import json
+import argparse
import traceback
from typing import Any
from pathlib import Path
@@ -10,16 +11,16 @@ from logging import Logger, getLogger, StreamHandler
import MySQLdb as mdb
from redis import Redis
+from gn_libs.mysqldb import database_connection
from functional_tools import take
-import r_qtl.errors as rqe
import r_qtl.r_qtl2 as rqtl2
import r_qtl.r_qtl2_qc as rqc
+import r_qtl.exceptions as rqe
-from qc_app import jobs
-from qc_app.db_utils import database_connection
-from qc_app.check_connections import check_db, check_redis
+from uploader import jobs
+from uploader.check_connections import check_db, check_redis
from scripts.cli_parser import init_cli_parser
from scripts.redis_logger import setup_redis_logger
@@ -93,11 +94,14 @@ def process_bundle(dbconn: mdb.Connection,
if has_geno_file(thejob):
logger.info("Processing geno files.")
genoexit = install_genotypes(
+ rconn,
dbconn,
- meta["speciesid"],
- meta["populationid"],
- meta["geno-dataset-id"],
- Path(meta["rqtl2-bundle-file"]),
+ f"{rprefix}:{jobid}",
+ argparse.Namespace(
+ speciesid=meta["speciesid"],
+ populationid=meta["populationid"],
+ datasetid=meta["geno-dataset-id"],
+ rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
logger)
if genoexit != 0:
raise Exception("Processing 'geno' file failed.")
@@ -108,11 +112,14 @@ def process_bundle(dbconn: mdb.Connection,
if has_pheno_file(thejob):
phenoexit = install_pheno_files(
+ rconn,
dbconn,
- meta["speciesid"],
- meta["platformid"],
- meta["probe-dataset-id"],
- Path(meta["rqtl2-bundle-file"]),
+ f"{rprefix}:{jobid}",
+ argparse.Namespace(
+ speciesid=meta["speciesid"],
+ platformid=meta["platformid"],
+ dataset_id=meta["probe-dataset-id"],
+ rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
logger)
if phenoexit != 0:
raise Exception("Processing 'pheno' file failed.")
diff --git a/scripts/qc.py b/scripts/qc.py
index e8573a9..b00f4c1 100644
--- a/scripts/qc.py
+++ b/scripts/qc.py
@@ -5,14 +5,14 @@ import mimetypes
from typing import Union, Callable
from argparse import ArgumentParser
+from gn_libs.mysqldb import database_connection
+
from functional_tools import take
from quality_control.utils import make_progress_calculator
from quality_control.errors import InvalidValue, DuplicateHeading
from quality_control.parsing import FileType, strain_names, collect_errors
-from qc_app.db_utils import database_connection
-
from .cli_parser import init_cli_parser
diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py
index deef8fe..9f9248c 100644
--- a/scripts/qc_on_rqtl2_bundle.py
+++ b/scripts/qc_on_rqtl2_bundle.py
@@ -10,22 +10,24 @@ import multiprocessing as mproc
from logging import Logger, getLogger, StreamHandler
from typing import Union, Sequence, Callable, Iterator
+import MySQLdb as mdb
from redis import Redis
+from gn_libs.mysqldb import database_connection
from quality_control.errors import InvalidValue
from quality_control.checks import decimal_points_error
-from qc_app import jobs
-from qc_app.check_connections import check_db, check_redis
+from uploader import jobs
+from uploader.check_connections import check_db, check_redis
-from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
from r_qtl import fileerrors as rqfe
-from scripts.cli_parser import init_cli_parser
from scripts.process_rqtl2_bundle import parse_job
from scripts.redis_logger import setup_redis_logger
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
def dict2tuple(dct: dict) -> tuple:
"""Utility to convert items in dicts to pairs of tuples."""
@@ -103,13 +105,13 @@ def retrieve_errors_with_progress(rconn: Redis,#pylint: disable=[too-many-locals
__update_processed__(value)
rconn.hset(fqjobid, f"{filetype}-linecount", count)
- except rqe.MissingFileError:
+ except rqe.MissingFileException:
fname = cdata.get(filetype)
yield rqfe.MissingFile(filetype, fname, (
f"The file '{fname}' does not exist in the bundle despite it being "
f"listed under '{filetype}' in the control file."))
-def qc_geno_errors(rconn, fqjobid, zfile, logger) -> bool:
+def qc_geno_errors(rconn, fqjobid, _dburi, _speciesid, zfile, logger) -> bool:
"""Check for errors in `geno` file(s)."""
cdata = rqtl2.control_data(zfile)
if "geno" in cdata:
@@ -129,15 +131,32 @@ def qc_geno_errors(rconn, fqjobid, zfile, logger) -> bool:
return False
-def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[
- Union[InvalidValue, rqfe.MissingFile], ...]:
+def fetch_db_geno_samples(conn: mdb.Connection, speciesid: int) -> tuple[str, ...]:
+ """Fetch samples/cases/individuals from the database."""
+ samples = set()# type: ignore[var-annotated]
+ with conn.cursor() as cursor:
+ cursor.execute("SELECT Name, Name2 from Strain WHERE SpeciesId=%s",
+ (speciesid,))
+ rows = cursor.fetchall() or tuple()
+ for row in rows:
+ samples.update(tuple(row))
+
+ return tuple(item.strip() for item in samples if bool(item))
+
+
+def check_pheno_samples(
+ conn: mdb.Connection,
+ speciesid: int,
+ zipfilepath: Union[str, Path],
+ logger: Logger
+) -> tuple[Union[InvalidValue, rqfe.MissingFile], ...]:
"""Check that samples in 'pheno' file exist in geno file."""
cdata = rqtl2.read_control_file(zipfilepath)
genosamples = tuple(
sample for perfilesamples in (
rqtl2.load_samples(zipfilepath, member, cdata["geno_transposed"])
for member in cdata["geno"])
- for sample in perfilesamples)
+ for sample in perfilesamples) + fetch_db_geno_samples(conn, speciesid)
def __check_file__(member) -> tuple[InvalidValue, ...]:
logger.info("Checking samples/cases in member file '%s' …", member)
@@ -149,7 +168,9 @@ def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[
errors = errors + (InvalidValue(
member, "-", "-", sample,
f"The individual/case/sample '{sample}' in file "
- f"{member} does not exist in any of the 'geno' files."),)
+ f"{member} does not exist in either, any of the 'geno' "
+ "files provided in the bundle or the GeneNetwork database."
+ ),)
logger.info("Found %s missing samples in member file '%s'.",
len(errors),
@@ -170,15 +191,21 @@ def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[
return allerrors
-def qc_pheno_errors(rconn, fqjobid, zfile, logger) -> bool:
+def qc_pheno_errors(# pylint: disable=[too-many-arguments]
+ rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool:
"""Check for errors in `pheno` file(s)."""
cdata = rqtl2.control_data(zfile)
if "pheno" in cdata:
logger.info("Checking for errors in the 'pheno' file…")
- perrs = check_pheno_samples(zfile.filename, logger) + tuple(
- retrieve_errors_with_progress(
- rconn,fqjobid, zfile, "pheno",
- (partial(decimal_points_error, filename="pheno", mini=3),)))
+ perrs = tuple()# type: ignore[var-annotated]
+ with database_connection(dburi) as dbconn:
+ perrs = check_pheno_samples(
+ dbconn, speciesid, zfile.filename, logger) + tuple(
+ retrieve_errors_with_progress(
+ rconn,fqjobid, zfile, "pheno",
+ (partial(decimal_points_error,
+ filename="pheno",
+ mini=3),)))
add_to_errors(rconn, fqjobid, "errors-generic", tuple(
err for err in perrs if isinstance(err, rqfe.MissingFile)))
add_to_errors(rconn, fqjobid, "errors-pheno", tuple(
@@ -190,7 +217,8 @@ def qc_pheno_errors(rconn, fqjobid, zfile, logger) -> bool:
return False
-def qc_phenose_errors(rconn, fqjobid, zfile, logger) -> bool:
+def qc_phenose_errors(# pylint: disable=[too-many-arguments]
+ rconn, fqjobid, _dburi, _speciesid, zfile, logger) -> bool:
"""Check for errors in `phenose` file(s)."""
cdata = rqtl2.control_data(zfile)
if "phenose" in cdata:
@@ -209,7 +237,14 @@ def qc_phenose_errors(rconn, fqjobid, zfile, logger) -> bool:
return False
-def qc_phenocovar_errors(_rconn, _fqjobid, _zfile, _logger) -> bool:
+def qc_phenocovar_errors(
+ _rconn,
+ _fqjobid,
+ _dburi,
+ _speciesid,
+ _zfile,
+ _logger
+) -> bool:
"""Check for errors in `phenocovar` file(s)."""
return False
@@ -225,12 +260,20 @@ def run_qc(rconn: Redis,
if qc_missing_files(rconn, fqjobid, zfile, logger):
return 1
- def with_zipfile(rconn, fqjobid, filename, logger, func):
+ def with_zipfile(# pylint: disable=[too-many-arguments]
+ rconn, fqjobid, dbconn, speciesid, filename, logger, func
+ ):
with ZipFile(filename, "r") as zfile:
- return func(rconn, fqjobid, zfile, logger)
+ return func(rconn, fqjobid, dbconn, speciesid, zfile, logger)
def buildargs(func):
- return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func)
+ return (rconn,
+ fqjobid,
+ args.databaseuri,
+ args.speciesid,
+ jobmeta["rqtl2-bundle-file"],
+ logger,
+ func)
processes = [
mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)),
mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)),
@@ -263,8 +306,8 @@ def run_qc(rconn: Redis,
if __name__ == "__main__":
def main():
"""Enter R/qtl2 bundle QC runner."""
- args = init_cli_parser(
- "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args()
+ args = add_global_data_arguments(init_cli_parser(
+ "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
check_redis(args.redisuri)
check_db(args.databaseuri)
diff --git a/scripts/qcapp_wsgi.py b/scripts/qcapp_wsgi.py
index 9fb63cb..fe77031 100644
--- a/scripts/qcapp_wsgi.py
+++ b/scripts/qcapp_wsgi.py
@@ -1,11 +1,36 @@
"""Run the application"""
+import os
import sys
from logging import getLogger, StreamHandler
from flask import Flask
-from qc_app import create_app
-from qc_app.check_connections import check_db, check_redis
+from uploader import create_app
+from uploader.check_connections import check_db, check_redis
+
+def setup_logging(appl: Flask) -> Flask:
+ """Setup appropriate logging paradigm depending on environment."""
+ # https://datatracker.ietf.org/doc/html/draft-coar-cgi-v11-03#section-4.1.17
+ # https://wsgi.readthedocs.io/en/latest/proposals-2.0.html#making-some-keys-required
+ # https://peps.python.org/pep-3333/#id4
+ software, *_version_and_comments = os.environ.get(
+ "SERVER_SOFTWARE", "").split('/')
+ if bool(software):
+ gunicorn_logger = getLogger("gunicorn.error")
+ appl.logger.handlers = gunicorn_logger.handlers
+ appl.logger.setLevel(gunicorn_logger.level)#pylint: disable=[no-member]
+ else:
+ loglevel = appl.config["LOG_LEVEL"].upper()
+ # Maybe call `logging.dictConfig(…)` here instead of all this stuff below
+ handler_stderr = StreamHandler(stream=sys.stderr)
+ appl.logger.addHandler(handler_stderr)
+ rootlogger = getLogger()
+ rootlogger.addHandler(handler_stderr)
+ rootlogger.setLevel(loglevel)
+ appl.logger.setLevel(loglevel)
+
+ return appl
+
def check_and_build_app() -> Flask:
"""Setup the application for running."""
@@ -15,30 +40,11 @@ def check_and_build_app() -> Flask:
# Check connections
check_db(appl.config["SQL_URI"])
check_redis(appl.config["REDIS_URL"])
- return appl
+ return setup_logging(appl)
-def setup_logging(appl: Flask):
- """Setup application logging"""
- loglevel = appl.config["LOG_LEVEL"].upper()
-
- # Maybe call `logging.dictConfig(…)` here instead of all this stuff below
- handler_stderr = StreamHandler(stream=sys.stderr)
- appl.logger.addHandler(handler_stderr)
-
- rootlogger = getLogger()
- rootlogger.addHandler(handler_stderr)
- rootlogger.setLevel(loglevel)
-
- appl.logger.setLevel(loglevel)
app = check_and_build_app()
-if __name__ != "__main__":# Running via gunicorn
- gunicorn_logger = getLogger("gunicorn.error")
- app.logger.handlers = gunicorn_logger.handlers
- app.logger.setLevel(gunicorn_logger.level)#pylint: disable=[no-member]
-
-if __name__ == "__main__":# Running via flask
+if __name__ == "__main__":
# Run the app
- setup_logging(app)
app.run()
diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py
index 2ae682b..d3fde5f 100644
--- a/scripts/redis_logger.py
+++ b/scripts/redis_logger.py
@@ -1,5 +1,6 @@
"""Utilities to log to redis for our worker scripts."""
import logging
+from typing import Optional
from redis import Redis
@@ -26,6 +27,26 @@ class RedisLogger(logging.Handler):
self.redisconnection.rpush(self.messageslistname, self.format(record))
self.redisconnection.expire(self.messageslistname, self.expiry)
+class RedisMessageListHandler(logging.Handler):
+ """Send messages to specified redis list."""
+ def __init__(self,
+ rconn: Redis,
+ fullyqualifiedkey: str,
+ loglevel: int = logging.NOTSET,
+ expiry: Optional[int] = 86400):
+ super().__init__(loglevel)
+ self.redisconnection = rconn
+ self.fullyqualifiedkey = fullyqualifiedkey
+ self.expiry = expiry
+
+ def emit(self, record):
+ """Log out to specified `fullyqualifiedkey`."""
+ self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record))
+ if bool(self.expiry):
+ self.redisconnection.expire(self.fullyqualifiedkey, self.expiry)
+ else:
+ self.redisconnection.persist(self.fullyqualifiedkey)
+
def setup_redis_logger(rconn: Redis,
fullyqualifiedjobid: str,
job_messagelist: str,
diff --git a/scripts/rqtl2/bundleutils.py b/scripts/rqtl2/bundleutils.py
new file mode 100644
index 0000000..17faa7c
--- /dev/null
+++ b/scripts/rqtl2/bundleutils.py
@@ -0,0 +1,44 @@
+"""Common utilities to operate in R/qtl2 bundles."""
+from typing import Union, Callable
+
+def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]:
+ """Build and return a function to use to split data in the files.
+
+ Parameters
+ ----------
+ cdata: A dict holding the control information included with the R/qtl2
+ bundle.
+
+ Returns
+ -------
+ A function that takes a string and return a tuple of strings.
+ """
+ separator = cdata["sep"]
+ na_strings = cdata["na.strings"]
+ def __splitter__(line: str) -> tuple[Union[str, None], ...]:
+ return tuple(
+ item if item not in na_strings else None
+ for item in
+ (field.strip() for field in line.strip().split(separator)))
+ return __splitter__
+
+
+def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]:
+ """Build and return a function to use to split data in the files.
+
+ Parameters
+ ----------
+ cdata: A dict holding the control information included with the R/qtl2
+ bundle.
+
+ Returns
+ -------
+ A function that takes a string and return a tuple of strings.
+ """
+ separator = cdata["sep"]
+ na_strings = cdata["na.strings"]
+ def __joiner__(row: tuple[Union[str, None], ...]) -> str:
+ return separator.join(
+ (na_strings[0] if item is None else item)
+ for item in row)
+ return __joiner__
diff --git a/scripts/rqtl2/cli_parser.py b/scripts/rqtl2/cli_parser.py
index bcc7a4f..9bb60a3 100644
--- a/scripts/rqtl2/cli_parser.py
+++ b/scripts/rqtl2/cli_parser.py
@@ -2,12 +2,22 @@
from pathlib import Path
from argparse import ArgumentParser
-def add_common_arguments(parser: ArgumentParser) -> ArgumentParser:
- """Add common arguments to the CLI parser."""
- parser.add_argument("datasetid",
- type=int,
- help="The dataset to which the data belongs.")
+def add_bundle_argument(parser: ArgumentParser) -> ArgumentParser:
+ """Add the `rqtl2bundle` argument."""
parser.add_argument("rqtl2bundle",
type=Path,
help="Path to R/qtl2 bundle zip file.")
return parser
+
+
+def add_datasetid_argument(parser: ArgumentParser) -> ArgumentParser:
+ """Add the `datasetid` argument."""
+ parser.add_argument("datasetid",
+ type=int,
+ help="The dataset to which the data belongs.")
+ return parser
+
+
+def add_common_arguments(parser: ArgumentParser) -> ArgumentParser:
+ """Add common arguments to the CLI parser."""
+ return add_bundle_argument(add_datasetid_argument(parser))
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index 93fc130..e0e00e7 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -1,38 +1,58 @@
"""Build common script-entry structure."""
-from logging import Logger
+import sys
+import logging
from typing import Callable
from argparse import Namespace
+from logging import StreamHandler
from redis import Redis
from MySQLdb import Connection
+from gn_libs.mysqldb import database_connection
-from qc_app import jobs
-from qc_app.db_utils import database_connection
-from qc_app.check_connections import check_db, check_redis
+from uploader import jobs
+from uploader.check_connections import check_db, check_redis
from scripts.redis_logger import setup_redis_logger
-def build_main(args: Namespace,
- run_fn: Callable[[Connection, Namespace], int],
- logger: Logger,
- loglevel: str = "INFO") -> Callable[[],int]:
+def build_main(
+ args: Namespace,
+ run_fn: Callable[
+ [Redis, Connection, str, Namespace, logging.Logger],
+ int
+ ],
+ logger: logging.Logger
+) -> Callable[[],int]:
"""Build a function to be used as an entry-point for scripts."""
def main():
- check_db(args.databaseuri)
- check_redis(args.redisuri)
- if not args.rqtl2bundle.exists():
- logger.error("File not found: '%s'.", args.rqtl2bundle)
- return 2
-
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
- fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid)
- logger.addHandler(setup_redis_logger(
- rconn,
- fqjobid,
- f"{fqjobid}:log-messages",
- args.redisexpiry))
- logger.setLevel(loglevel)
- return run_fn(dbconn, args)
+ logger.setLevel(args.loglevel.upper())
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+
+ try:
+ rconn.hset(fqjobid, "status", "started")
+ logger.addHandler(setup_redis_logger(
+ rconn,
+ fqjobid,
+ f"{fqjobid}:log-messages",
+ args.redisexpiry))
+ logger.addHandler(StreamHandler(stream=sys.stderr))
+
+ check_db(args.databaseuri)
+ check_redis(args.redisuri)
+ if not args.rqtl2bundle.exists():
+ logger.error("File not found: '%s'.", args.rqtl2bundle)
+ return 2
+
+ returncode = run_fn(rconn, dbconn, fqjobid, args)
+ if returncode == 0:
+ rconn.hset(fqjobid, "status", "completed:success")
+ return returncode
+ rconn.hset(fqjobid, "status", "completed:error")
+ return returncode
+ except Exception as _exc:# pylint: disable=[broad-except]
+ logger.error("The process failed!", exc_info=True)
+ rconn.hset(fqjobid, "status", "completed:error")
+ return 4
return main
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index d0731a2..8762655 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -1,12 +1,13 @@
"""Load genotypes from R/qtl2 bundle into the database."""
import sys
+import argparse
import traceback
-from pathlib import Path
from zipfile import ZipFile
from functools import reduce
from typing import Iterator, Optional
-from logging import Logger, getLogger, StreamHandler
+from logging import Logger, getLogger
+from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -19,10 +20,15 @@ from scripts.rqtl2.entry import build_main
from scripts.rqtl2.cli_parser import add_common_arguments
from scripts.cli_parser import init_cli_parser, add_global_data_arguments
-def insert_markers(dbconn: mdb.Connection,
- speciesid: int,
- markers: tuple[str, ...],
- pmapdata: Optional[Iterator[dict]]) -> int:
+__MODULE__ = "scripts.rqtl2.install_genotypes"
+
+def insert_markers(
+ dbconn: mdb.Connection,
+ speciesid: int,
+ markers: tuple[str, ...],
+ pmapdata: Optional[Iterator[dict]],
+ _logger: Logger
+) -> int:
"""Insert genotype and genotype values into the database."""
mdata = reduce(#type: ignore[var-annotated]
lambda acc, row: ({#type: ignore[arg-type, return-value]
@@ -40,16 +46,20 @@ def insert_markers(dbconn: mdb.Connection,
"VALUES (%(speciesid)s, %(marker)s, %(marker)s, %(chr)s, %(pos)s) "
"ON DUPLICATE KEY UPDATE SpeciesId=SpeciesId",
tuple({
- "speciesid": speciesid,
- "marker": marker,
- "chr": mdata.get(marker, {}).get("chr"),
- "pos": mdata.get(marker, {}).get("pos")
- } for marker in markers))
+ (speciesid, marker): {
+ "speciesid": speciesid,
+ "marker": marker,
+ "chr": mdata.get(marker, {}).get("chr"),
+ "pos": mdata.get(marker, {}).get("pos")
+ } for marker in markers}.values()))
return cursor.rowcount
-def insert_individuals(dbconn: mdb.Connection,
- speciesid: int,
- individuals: tuple[str, ...]) -> int:
+def insert_individuals(
+ dbconn: mdb.Connection,
+ speciesid: int,
+ individuals: tuple[str, ...],
+ _logger: Logger
+) -> int:
"""Insert individuals/samples into the database."""
with dbconn.cursor() as cursor:
cursor.executemany(
@@ -60,10 +70,13 @@ def insert_individuals(dbconn: mdb.Connection,
for individual in individuals))
return cursor.rowcount
-def cross_reference_individuals(dbconn: mdb.Connection,
- speciesid: int,
- populationid: int,
- individuals: tuple[str, ...]) -> int:
+def cross_reference_individuals(
+ dbconn: mdb.Connection,
+ speciesid: int,
+ populationid: int,
+ individuals: tuple[str, ...],
+ _logger: Logger
+) -> int:
"""Cross reference any inserted individuals."""
with dbconn.cursor(cursorclass=DictCursor) as cursor:
paramstr = ", ".join(["%s"] * len(individuals))
@@ -79,11 +92,13 @@ def cross_reference_individuals(dbconn: mdb.Connection,
tuple(ids))
return cursor.rowcount
-def insert_genotype_data(dbconn: mdb.Connection,
- speciesid: int,
- genotypes: tuple[dict, ...],
- individuals: tuple[str, ...]) -> tuple[
- int, tuple[dict, ...]]:
+def insert_genotype_data(
+ dbconn: mdb.Connection,
+ speciesid: int,
+ genotypes: tuple[dict, ...],
+ individuals: tuple[str, ...],
+ _logger: Logger
+) -> tuple[int, tuple[dict, ...]]:
"""Insert the genotype data values into the database."""
with dbconn.cursor(cursorclass=DictCursor) as cursor:
paramstr = ", ".join(["%s"] * len(individuals))
@@ -119,11 +134,14 @@ def insert_genotype_data(dbconn: mdb.Connection,
"markerid": row["markerid"]
} for row in data)
-def cross_reference_genotypes(dbconn: mdb.Connection,
- speciesid: int,
- datasetid: int,
- dataids: tuple[dict, ...],
- gmapdata: Optional[Iterator[dict]]) -> int:
+def cross_reference_genotypes(
+ dbconn: mdb.Connection,
+ speciesid: int,
+ datasetid: int,
+ dataids: tuple[dict, ...],
+ gmapdata: Optional[Iterator[dict]],
+ _logger: Logger
+) -> int:
"""Cross-reference the data to the relevant dataset."""
_rows, markers, mdata = reduce(#type: ignore[var-annotated]
lambda acc, row: (#type: ignore[return-value,arg-type]
@@ -139,31 +157,45 @@ def cross_reference_genotypes(dbconn: mdb.Connection,
(tuple(), tuple(), {}))
with dbconn.cursor(cursorclass=DictCursor) as cursor:
- paramstr = ", ".join(["%s"] * len(markers))
- cursor.execute("SELECT Id, Name FROM Geno "
- f"WHERE SpeciesId=%s AND Name IN ({paramstr})",
- (speciesid,) + markers)
- markersdict = {row["Id"]: row["Name"] for row in cursor.fetchall()}
- cursor.executemany(
+ markersdict = {}
+ if len(markers) > 0:
+ paramstr = ", ".join(["%s"] * len(markers))
+ insertparams = (speciesid,) + markers
+ selectquery = ("SELECT Id, Name FROM Geno "
+ f"WHERE SpeciesId=%s AND Name IN ({paramstr})")
+ _logger.debug(
+ "The select query was\n\t%s\n\nwith the parameters\n\t%s",
+ selectquery,
+ (speciesid,) + markers)
+ cursor.execute(selectquery, insertparams)
+ markersdict = {row["Id"]: row["Name"] for row in cursor.fetchall()}
+
+ insertquery = (
"INSERT INTO GenoXRef(GenoFreezeId, GenoId, DataId, cM) "
"VALUES(%(datasetid)s, %(markerid)s, %(dataid)s, %(pos)s) "
- "ON DUPLICATE KEY UPDATE GenoFreezeId=GenoFreezeId",
- tuple({
- **row,
- "datasetid": datasetid,
- "pos": mdata.get(markersdict.get(
- row.get("markerid"), {}), {}).get("pos")
- } for row in dataids))
+ "ON DUPLICATE KEY UPDATE GenoFreezeId=GenoFreezeId")
+ insertparams = tuple({
+ **row,
+ "datasetid": datasetid,
+ "pos": mdata.get(markersdict.get(
+ row.get("markerid"), "nosuchkey"), {}).get("pos")
+ } for row in dataids)
+ _logger.debug(
+ "The insert query was\n\t%s\n\nwith the parameters\n\t%s",
+ insertquery, insertparams)
+ cursor.executemany(insertquery, insertparams)
return cursor.rowcount
-def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_genotypes(#pylint: disable=[too-many-locals]
+ rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
- speciesid: int,
- populationid: int,
- datasetid: int,
- rqtl2bundle: Path,
- logger: Logger = getLogger()) -> int:
+ fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
+ args: argparse.Namespace,
+ logger: Logger = getLogger(__name__)
+) -> int:
"""Load any existing genotypes into the database."""
+ (speciesid, populationid, datasetid, rqtl2bundle) = (
+ args.speciesid, args.populationid, args.datasetid, args.rqtl2bundle)
count = 0
with ZipFile(str(rqtl2bundle.absolute()), "r") as zfile:
try:
@@ -188,20 +220,22 @@ def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
speciesid,
tuple(key for key in batch[0].keys() if key != "id"),
(rqtl2.file_data(zfile, "pmap", cdata) if "pmap" in cdata
- else None))
+ else None),
+ logger)
individuals = tuple(row["id"] for row in batch)
- insert_individuals(dbconn, speciesid, individuals)
+ insert_individuals(dbconn, speciesid, individuals, logger)
cross_reference_individuals(
- dbconn, speciesid, populationid, individuals)
+ dbconn, speciesid, populationid, individuals, logger)
_num_rows, dataids = insert_genotype_data(
- dbconn, speciesid, batch, individuals)
+ dbconn, speciesid, batch, individuals, logger)
cross_reference_genotypes(
dbconn,
speciesid,
datasetid,
dataids,
(rqtl2.file_data(zfile, "gmap", cdata)
- if "gmap" in cdata else None))
+ if "gmap" in cdata else None),
+ logger)
count = count + len(batch)
except rqtl2.InvalidFormat as exc:
logger.error(str(exc))
@@ -223,15 +257,5 @@ if __name__ == "__main__":
return parser.parse_args()
- thelogger = getLogger("install_genotypes")
- thelogger.addHandler(StreamHandler(stream=sys.stderr))
- main = build_main(
- cli_args(),
- lambda dbconn, args: install_genotypes(dbconn,
- args.speciesid,
- args.populationid,
- args.datasetid,
- args.rqtl2bundle),
- thelogger,
- "INFO")
+ main = build_main(cli_args(), install_genotypes, __MODULE__)
sys.exit(main())
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index b5cab8e..9059cd6 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -1,11 +1,12 @@
"""Load pheno from R/qtl2 bundle into the database."""
import sys
+import argparse
import traceback
-from pathlib import Path
from zipfile import ZipFile
from functools import reduce
-from logging import Logger, getLogger, StreamHandler
+from logging import Logger, getLogger
+from redis import Redis
import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
@@ -18,6 +19,8 @@ from r_qtl import r_qtl2_qc as rqc
from functional_tools import take
+__MODULE__ = "scripts.rqtl2.install_phenos"
+
def insert_probesets(dbconn: mdb.Connection,
platformid: int,
phenos: tuple[str, ...]) -> int:
@@ -93,14 +96,15 @@ def cross_reference_probeset_data(dbconn: mdb.Connection,
} for row in dataids))
return cursor.rowcount
-def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_pheno_files(#pylint: disable=[too-many-locals]
+ rconn: Redis,#pylint: disable=[unused-argument]
dbconn: mdb.Connection,
- speciesid: int,
- platformid: int,
- datasetid: int,
- rqtl2bundle: Path,
+ fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
+ args: argparse.Namespace,
logger: Logger = getLogger()) -> int:
"""Load data in `pheno` files and other related files into the database."""
+ (speciesid, platformid, datasetid, rqtl2bundle) = (
+ args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle)
with ZipFile(str(rqtl2bundle), "r") as zfile:
try:
rqc.validate_bundle(zfile)
@@ -155,16 +159,5 @@ if __name__ == "__main__":
return parser.parse_args()
- thelogger = getLogger("install_phenos")
- thelogger.addHandler(StreamHandler(stream=sys.stderr))
- main = build_main(
- cli_args(),
- lambda dbconn, args: install_pheno_files(dbconn,
- args.speciesid,
- args.platformid,
- args.datasetid,
- args.rqtl2bundle,
- thelogger),
- thelogger,
- "DEBUG")
+ main = build_main(cli_args(), install_pheno_files, __MODULE__)
sys.exit(main())
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
new file mode 100644
index 0000000..5c89ca0
--- /dev/null
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -0,0 +1,516 @@
+"""Run quality control on phenotypes-specific files in the bundle."""
+import sys
+import uuid
+import json
+import shutil
+import logging
+import tempfile
+import contextlib
+from pathlib import Path
+from logging import Logger
+from zipfile import ZipFile
+from argparse import Namespace
+import multiprocessing as mproc
+from functools import reduce, partial
+from typing import Union, Iterator, Callable, Optional, Sequence
+
+import MySQLdb as mdb
+from redis import Redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl.fileerrors import InvalidValue
+
+from functional_tools import chain
+
+from quality_control.checks import decimal_places_pattern
+
+from uploader.files import sha256_digest_over_file
+from uploader.samples.models import samples_by_species_and_population
+
+from scripts.rqtl2.entry import build_main
+from scripts.redis_logger import RedisMessageListHandler
+from scripts.rqtl2.cli_parser import add_bundle_argument
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+logging.basicConfig(
+ format=("%(asctime)s - %(levelname)s %(name)s: "
+ "(%(pathname)s: %(lineno)d) %(message)s"))
+logger = logging.getLogger(__MODULE__)
+
+def validate(phenobundle: Path, logger: Logger) -> dict:
+ """Check that the bundle is generally valid"""
+ try:
+ rqc.validate_bundle(phenobundle)
+ except rqe.RQTLError as rqtlerr:
+ # logger.error("Bundle file validation failed!", exc_info=True)
+ return {
+ "skip": True,
+ "logger": logger,
+ "phenobundle": phenobundle,
+ "errors": (" ".join(rqtlerr.args),)
+ }
+ return {
+ "errors": tuple(),
+ "skip": False,
+ "phenobundle": phenobundle,
+ "logger": logger
+ }
+
+
+def check_for_mandatory_pheno_keys(
+ phenobundle: Path,
+ logger: Logger,
+ **kwargs
+) -> dict:
+ """Check that the mandatory keys exist for phenotypes."""
+ if kwargs.get("skip", False):
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle
+ }
+
+ _mandatory_keys = ("pheno", "phenocovar")
+ _cdata = rqtl2.read_control_file(phenobundle)
+ _errors = kwargs.get("errors", tuple()) + tuple(
+ f"Expected '{key}' file(s) are not declared in the bundle."
+ for key in _mandatory_keys if key not in _cdata.keys())
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle,
+ "errors": _errors,
+ "skip": len(_errors) > 0
+ }
+
+
+def check_for_averages_files(
+ phenobundle: Path,
+ logger: Logger,
+ **kwargs
+) -> dict:
+ """Check that averages files appear together"""
+ if kwargs.get("skip", False):
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle
+ }
+
+ _together = (("phenose", "phenonum"), ("phenonum", "phenose"))
+ _cdata = rqtl2.read_control_file(phenobundle)
+ _errors = kwargs.get("errors", tuple()) + tuple(
+ f"'{first}' is defined in the control file but there is no "
+ f"corresponding '{second}'"
+ for first, second in _together
+ if ((first in _cdata.keys()) and (second not in _cdata.keys())))
+ return {
+ **kwargs,
+ "logger": logger,
+ "phenobundle": phenobundle,
+ "errors": _errors,
+ "skip": len(_errors) > 0
+ }
+
+
+def extract_bundle(
+ bundle: Path, workdir: Path, jobid: uuid.UUID
+) -> tuple[Path, tuple[Path, ...]]:
+ """Extract the bundle."""
+ with ZipFile(bundle) as zfile:
+ extractiondir = workdir.joinpath(
+ f"{str(jobid)}-{sha256_digest_over_file(bundle)}-{bundle.name}")
+ return extractiondir, rqtl2.extract(zfile, extractiondir)
+
+
+def undo_transpose(filetype: str, cdata: dict, extractiondir):
+ """Undo transposition of all files of type `filetype` in thebundle."""
+ if len(cdata.get(filetype, [])) > 0 and cdata.get(f"{filetype}_transposed", False):
+ files = (extractiondir.joinpath(_file) for _file in cdata[filetype])
+ for _file in files:
+ rqtl2.transpose_csv_with_rename(
+ _file,
+ build_line_splitter(cdata),
+ build_line_joiner(cdata))
+
+
+@contextlib.contextmanager
+def redis_logger(
+ redisuri: str, loggername: str, filename: str, fqkey: str
+) -> Iterator[logging.Logger]:
+ """Build a Redis message-list logger."""
+ rconn = Redis.from_url(redisuri, decode_responses=True)
+ logger = logging.getLogger(loggername)
+ logger.propagate = False
+ handler = RedisMessageListHandler(
+ rconn,
+ fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
+ handler.setFormatter(logging.getLogger().handlers[0].formatter)
+ logger.addHandler(handler)
+ try:
+ yield logger
+ finally:
+ rconn.close()
+
+
+def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue:
+ """Persist the error in redis."""
+ rconn.rpush(fqkey, json.dumps(error._asdict()))
+ return error
+
+
+def file_fqkey(prefix: str, section: str, filepath: Path) -> str:
+ """Build a files fully-qualified key in a consistent manner"""
+ return f"{prefix}:{section}:{filepath.name}"
+
+
+def qc_phenocovar_file(
+ filepath: Path,
+ redisuri,
+ fqkey: str,
+ separator: str,
+ comment_char: str):
+ """Check that `phenocovar` files are structured correctly."""
+ with (redis_logger(
+ redisuri,
+ f"{__MODULE__}.qc_phenocovar_file",
+ filepath.name,
+ f"{fqkey}:logs") as logger,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
+ print("Running QC on file: ", filepath.name)
+ _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+ _headings = tuple(heading.lower() for heading in next(_csvfile))
+ _errors: tuple[InvalidValue, ...] = tuple()
+ save_error = partial(
+ push_error, rconn, file_fqkey(fqkey, "errors", filepath))
+ for heading in ("description", "units"):
+ if heading not in _headings:
+ _errors = (save_error(InvalidValue(
+ filepath.name,
+ "header row",
+ "-",
+ "-",
+ (f"File {filepath.name} is missing the {heading} heading "
+ "in the header line."))),)
+
+ def collect_errors(errors_and_linecount, line):
+ _errs, _lc = errors_and_linecount
+ logger.info("Testing record '%s'", line[0])
+ if len(line) != len(_headings):
+ _errs = _errs + (save_error(InvalidValue(
+ filepath.name,
+ line[0],
+ "-",
+ "-",
+ (f"Record {_lc} in file {filepath.name} has a different "
+ "number of columns than the number of headings"))),)
+ _line = dict(zip(_headings, line))
+ if not bool(_line.get("description")):
+ _errs = _errs + (
+ save_error(InvalidValue(filepath.name,
+ _line[_headings[0]],
+ "description",
+ _line.get("description"),
+ "The description is not provided!")),)
+
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "checking",
+ "linecount": _lc+1,
+ "total-errors": len(_errs)
+ })
+ return _errs, _lc+1
+
+ _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "completed",
+ "linecount": _linecount,
+ "total-errors": len(_errors)
+ })
+ return {filepath.name: {"errors": _errors, "linecount": _linecount}}
+
+
+def merge_dicts(*dicts):
+ """Merge multiple dicts into a single one."""
+ return reduce(lambda merged, dct: {**merged, **dct}, dicts, {})
+
+
+def decimal_points_error(# pylint: disable=[too-many-arguments]
+ filename: str,
+ rowtitle: str,
+ coltitle: str,
+ cellvalue: str,
+ message: str,
+ decimal_places: int = 1
+) -> Optional[InvalidValue]:
+ """Returns an error if the value does not meet the checks."""
+ if not bool(decimal_places_pattern(decimal_places).match(cellvalue)):
+ return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+ return None
+
+
+def integer_error(
+ filename: str,
+ rowtitle: str,
+ coltitle: str,
+ cellvalue: str,
+ message: str
+) -> Optional[InvalidValue]:
+ """Returns an error if the value does not meet the checks."""
+ try:
+ value = int(cellvalue)
+ if value <= 0:
+ raise ValueError("Must be a non-zero, positive number.")
+ return None
+ except ValueError as _verr:
+ return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+
+
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments]
+ filepath: Path,
+ redisuri: str,
+ fqkey: str,
+ samples: tuple[str, ...],
+ phenonames: tuple[str, ...],
+ separator: str,
+ comment_char: str,
+ na_strings: Sequence[str],
+ error_fn: Callable = decimal_points_error
+):
+ """Run QC/QA on a `pheno` file."""
+ with (redis_logger(
+ redisuri,
+ f"{__MODULE__}.qc_pheno_file",
+ filepath.name,
+ f"{fqkey}:logs") as logger,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
+ print("Running QC on file: ", filepath.name)
+ save_error = partial(
+ push_error, rconn, file_fqkey(fqkey, "errors", filepath))
+ _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+ _headings: tuple[str, ...] = tuple(
+ # select lowercase for comparison purposes
+ heading.lower() for heading in next(_csvfile))
+ _errors: tuple[InvalidValue, ...] = tuple()
+
+ _absent = tuple(pheno for pheno in _headings[1:] if pheno
+ not in tuple(
+ # lower to have consistent case with headings for
+ # comparison
+ phe.lower() for phe in phenonames))
+ if len(_absent) > 0:
+ _errors = _errors + (save_error(InvalidValue(
+ filepath.name,
+ "header row",
+ "-",
+ ", ".join(_absent),
+ ("The following phenotype names do not exist in any of the "
+ f"provided phenocovar files: ({', '.join(_absent)})"))),)
+
+ def collect_errors(errors_and_linecount, line):
+ _errs, _lc = errors_and_linecount
+ logger.debug("Checking row %s", line[0])
+ if line[0] not in samples:
+ _errs = _errs + (save_error(InvalidValue(
+ filepath.name,
+ line[0],
+ _headings[0],
+ line[0],
+ (f"The sample named '{line[0]}' does not exist in the database. "
+ "You will need to upload that first."))),)
+
+ for field, value in zip(_headings[1:], line[1:]):
+ if value in na_strings:
+ continue
+ _err = error_fn(
+ filepath.name,
+ line[0],
+ field,
+ value)
+ _errs = _errs + ((save_error(_err),) if bool(_err) else tuple())
+
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "checking",
+ "linecount": _lc+1,
+ "total-errors": len(_errs)
+ })
+ return _errs, _lc+1
+
+ _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+ rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+ mapping={
+ "status": "completed",
+ "linecount": _linecount,
+ "total-errors": len(_errors)
+ })
+ return {filepath.name: {"errors": _errors, "linecount": _linecount}}
+
+
+def phenotype_names(filepath: Path,
+ separator: str,
+ comment_char: str) -> tuple[str, ...]:
+ """Read phenotype names from `phenocovar` file."""
+ return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value]
+ rqtl2.read_csv_file(filepath, separator, comment_char),
+ tuple())[1:]
+
+def fullyqualifiedkey(
+ prefix: str,
+ rest: Optional[str] = None
+) -> Union[Callable[[str], str], str]:
+ """Compute fully qualified Redis key."""
+ if not bool(rest):
+ return lambda _rest: f"{prefix}:{_rest}"
+ return f"{prefix}:{rest}"
+
+def run_qc(# pylint: disable=[too-many-locals]
+ rconn: Redis,
+ dbconn: mdb.Connection,
+ fullyqualifiedjobid: str,
+ args: Namespace
+) -> int:
+ """Run quality control checks on the bundle."""
+ print("Beginning the quality assurance checks.")
+ results = check_for_averages_files(
+ **check_for_mandatory_pheno_keys(
+ **validate(args.rqtl2bundle, logger)))
+ errors = results.get("errors", tuple())
+ if len(errors) > 0:
+ logger.error("We found the following errors:\n%s",
+ "\n".join(f" - {error}" for error in errors))
+ return 1
+ # Run QC on actual values
+ # Steps:
+ # - Extract file to specific directory
+ extractiondir, *_bundlefiles = extract_bundle(
+ args.rqtl2bundle, args.workingdir, args.jobid)
+
+ # - For every pheno, phenocovar, phenose, phenonum file, undo
+ # transposition where relevant
+ cdata = rqtl2.control_data(extractiondir)
+ with mproc.Pool(mproc.cpu_count() - 1) as pool:
+ pool.starmap(
+ undo_transpose,
+ ((ftype, cdata, extractiondir)
+ for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
+
+ # - Fetch samples/individuals from database.
+ print("Fetching samples/individuals from the database.")
+ samples = tuple(#type: ignore[var-annotated]
+ item for item in set(reduce(
+ lambda acc, item: acc + (
+ item["Name"], item["Name2"], item["Symbol"], item["Alias"]),
+ samples_by_species_and_population(
+ dbconn, args.speciesid, args.populationid),
+ tuple()))
+ if bool(item))
+
+ # - Check that `description` and `units` is present in phenocovar for
+ # all phenotypes
+ rconn.hset(fullyqualifiedjobid,
+ "fully-qualified-keys:phenocovar",
+ json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
+ for _file in cdata.get("phenocovar", []))))
+ with mproc.Pool(mproc.cpu_count() - 1) as pool:
+ print("Check for errors in 'phenocovar' file(s).")
+ _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
+ (extractiondir.joinpath(_file),
+ args.redisuri,
+ f"{fullyqualifiedjobid}:phenocovar",
+ cdata["sep"],
+ cdata["comment.char"])
+ for _file in cdata.get("phenocovar", []))))
+
+ # - Check all samples in pheno files exist in database
+ # - Check all phenotypes in pheno files exist in phenocovar files
+ # - Check all numeric values in pheno files
+ phenonames = tuple(set(
+ name for names in pool.starmap(phenotype_names, tuple(
+ (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"])
+ for _file in cdata.get("phenocovar", [])))
+ for name in names))
+
+ dec_err_fn = partial(decimal_points_error, message=(
+ "Expected a non-negative number with at least one decimal "
+ "place."))
+
+ print("Check for errors in 'pheno' file(s).")
+ _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "pheno",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ samples,
+ phenonames,
+ cdata["sep"],
+ cdata["comment.char"],
+ cdata["na.strings"],
+ dec_err_fn
+ ) for _file in cdata.get("pheno", []))))
+
+ # - Check the 3 checks above for phenose and phenonum values too
+ # qc_phenose_files(…)
+ # qc_phenonum_files(…)
+ print("Check for errors in 'phenose' file(s).")
+ _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "phenose",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ samples,
+ phenonames,
+ cdata["sep"],
+ cdata["comment.char"],
+ cdata["na.strings"],
+ dec_err_fn
+ ) for _file in cdata.get("phenose", []))))
+
+ print("Check for errors in 'phenonum' file(s).")
+ _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+ extractiondir.joinpath(_file),
+ args.redisuri,
+ chain(
+ "phenonum",
+ fullyqualifiedkey(args.jobid),
+ fullyqualifiedkey(args.redisprefix)),
+ samples,
+ phenonames,
+ cdata["sep"],
+ cdata["comment.char"],
+ cdata["na.strings"],
+ partial(integer_error, message=(
+ "Expected a non-negative, non-zero integer value."))
+ ) for _file in cdata.get("phenonum", []))))
+
+ # - Delete all extracted files
+ shutil.rmtree(extractiondir)
+ return 0
+
+
+if __name__ == "__main__":
+ def cli_args():
+ """Process command-line arguments for `install_phenos`"""
+ parser = add_bundle_argument(add_global_data_arguments(init_cli_parser(
+ program="PhenotypesQC",
+ description=(
+ "Perform Quality Control checks on a phenotypes bundle file"))))
+ parser.add_argument(
+ "--workingdir",
+ default=f"{tempfile.gettempdir()}/phenotypes_qc",
+ help=("The directory where this script will put its intermediate "
+ "files."),
+ type=Path)
+ return parser.parse_args()
+
+ main = build_main(cli_args(), run_qc, logger)
+ sys.exit(main())
diff --git a/scripts/validate_file.py b/scripts/validate_file.py
index 0028795..52e55ec 100644
--- a/scripts/validate_file.py
+++ b/scripts/validate_file.py
@@ -8,12 +8,12 @@ from zipfile import ZipFile, is_zipfile
import jsonpickle
from redis import Redis
from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin]
+from gn_libs.mysqldb import database_connection
from quality_control.utils import make_progress_calculator
from quality_control.parsing import FileType, strain_names, collect_errors
-from qc_app import jobs
-from qc_app.db_utils import database_connection
+from uploader import jobs
from .cli_parser import init_cli_parser
from .qc import add_file_validation_arguments
diff --git a/scripts/worker.py b/scripts/worker.py
index 0eb9ea5..91b0332 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -11,8 +11,8 @@ from tempfile import TemporaryDirectory
from redis import Redis
-from qc_app import jobs
-from qc_app.check_connections import check_redis
+from uploader import jobs
+from uploader.check_connections import check_redis
def parse_args():
"Parse the command-line arguments"