aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/cli_parser.py3
-rw-r--r--scripts/insert_data.py6
-rw-r--r--scripts/insert_samples.py28
-rw-r--r--scripts/load_phenotypes_to_db.py519
-rw-r--r--scripts/process_rqtl2_bundle.py4
-rw-r--r--scripts/qc_on_rqtl2_bundle.py4
-rw-r--r--scripts/redis_logger.py2
-rw-r--r--scripts/rqtl2/entry.py30
-rw-r--r--scripts/rqtl2/phenotypes_qc.py58
-rw-r--r--scripts/worker.py2
10 files changed, 593 insertions, 63 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index d42ae66..0c91c5e 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -23,7 +23,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
"--loglevel",
type=str,
default="INFO",
- choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+ choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL",
+ "debug", "info", "warning", "error", "critical"],
help="The severity of events to track with the logger.")
return parser
diff --git a/scripts/insert_data.py b/scripts/insert_data.py
index 67038f8..aec0251 100644
--- a/scripts/insert_data.py
+++ b/scripts/insert_data.py
@@ -197,7 +197,7 @@ def probeset_ids(dbconn: mdb.Connection,
break
yield row
-def insert_means(# pylint: disable=[too-many-locals, too-many-arguments]
+def insert_means(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments]
filepath: str, speciesid: int, platform_id: int, datasetid: int,
dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument]
"Insert the means/averages data into the database"
@@ -232,7 +232,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments]
item for sublist in
read_datavalues(filepath, headings, strains).values()
for item in sublist),
- start=(last_data_id(dbconn)+1)))
+ start=last_data_id(dbconn)+1))
with dbconn.cursor(cursorclass=DictCursor) as cursor:
while True:
means = tuple(take(the_means, 10000))
@@ -245,7 +245,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments]
cursor.executemany(xref_query, means)
return 0
-def insert_se(# pylint: disable = [too-many-arguments,too-many-locals]
+def insert_se(# pylint: disable = [too-many-arguments,too-many-locals, too-many-positional-arguments]
filepath: str, speciesid: int, platformid: int, datasetid: int,
dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument]
"Insert the standard-error data into the database"
diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py
index 1b0a052..fc029f9 100644
--- a/scripts/insert_samples.py
+++ b/scripts/insert_samples.py
@@ -3,6 +3,7 @@ import sys
import logging
import pathlib
import argparse
+import traceback
import MySQLdb as mdb
from redis import Redis
@@ -33,7 +34,7 @@ class SeparatorAction(argparse.Action):
"""Process the value passed in."""
setattr(namespace, self.dest, (chr(9) if values == "\\t" else values))
-def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
+def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments, too-many-positional-arguments]
rconn: Redis,# pylint: disable=[unused-argument]
speciesid: int,
populationid: int,
@@ -73,6 +74,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments]
print("Samples upload successfully completed.")
return 0
+
if __name__ == "__main__":
def cli_args():
@@ -127,7 +129,7 @@ if __name__ == "__main__":
def main():
"""Run script to insert samples into the database."""
-
+ status_code = 1 # Exit with an Exception
args = cli_args()
check_db(args.databaseuri)
check_redis(args.redisuri)
@@ -137,13 +139,19 @@ if __name__ == "__main__":
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
- return insert_samples(dbconn,
- rconn,
- args.speciesid,
- args.populationid,
- args.samplesfile,
- args.separator,
- args.firstlineheading,
- args.quotechar)
+
+ try:
+ status_code = insert_samples(dbconn,
+ rconn,
+ args.speciesid,
+ args.populationid,
+ args.samplesfile,
+ args.separator,
+ args.firstlineheading,
+ args.quotechar)
+ except Exception as _exc:# pylint: disable=[broad-exception-caught]
+ print(traceback.format_exc(), file=sys.stderr)
+
+ return status_code
sys.exit(main())
diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py
new file mode 100644
index 0000000..c1a7687
--- /dev/null
+++ b/scripts/load_phenotypes_to_db.py
@@ -0,0 +1,519 @@
+"""Load phenotypes and their data provided in files into the database."""
+import sys
+import uuid
+import json
+import time
+import logging
+import argparse
+import datetime
+from typing import Any
+from pathlib import Path
+from zipfile import ZipFile
+from urllib.parse import urljoin
+from functools import reduce, partial
+
+from MySQLdb.cursors import DictCursor
+
+from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests
+
+from r_qtl import r_qtl2 as rqtl2
+from uploader.species.models import species_by_id
+from uploader.population.models import population_by_species_and_id
+from uploader.samples.models import samples_by_species_and_population
+from uploader.phenotypes.models import (
+ dataset_by_id,
+ save_phenotypes_data,
+ create_new_phenotypes,
+ quick_save_phenotypes_data)
+from uploader.publications.models import fetch_publication_by_id
+
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+logging.basicConfig(
+ format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
+logger = logging.getLogger(__name__)
+
+
+
+def __replace_na_strings__(line, na_strings):
+ return ((None if value in na_strings else value) for value in line)
+
+
+def save_phenotypes(
+ cursor: mysqldb.Connection,
+ control_data: dict[str, Any],
+ filesdir: Path
+) -> tuple[dict, ...]:
+ """Read `phenofiles` and save the phenotypes therein."""
+ phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"])
+ if len(phenofiles) <= 0:
+ return tuple()
+
+ if control_data["phenocovar_transposed"]:
+ logger.info("Undoing transposition of the files rows and columns.")
+ phenofiles = (
+ rqtl2.transpose_csv_with_rename(
+ _file,
+ build_line_splitter(control_data),
+ build_line_joiner(control_data))
+ for _file in phenofiles)
+
+ _headers = rqtl2.read_csv_file_headers(phenofiles[0],
+ control_data["phenocovar_transposed"],
+ control_data["sep"],
+ control_data["comment.char"])
+ return create_new_phenotypes(
+ cursor,
+ (dict(zip(_headers,
+ __replace_na_strings__(line, control_data["na.strings"])))
+ for filecontent
+ in (rqtl2.read_csv_file(path,
+ separator=control_data["sep"],
+ comment_char=control_data["comment.char"])
+ for path in phenofiles)
+ for idx, line in enumerate(filecontent)
+ if idx != 0))
+
+
+def __fetch_next_dataid__(conn: mysqldb.Connection) -> int:
+ """Fetch the next available DataId value from the database."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute(
+ "SELECT MAX(DataId) AS CurrentMaxDataId FROM PublishXRef")
+ return int(cursor.fetchone()["CurrentMaxDataId"]) + 1
+
+
+def __row_to_dataitems__(
+ sample_row: dict,
+ dataidmap: dict,
+ pheno_name2id: dict[str, int],
+ samples: dict
+) -> tuple[dict, ...]:
+ samplename = sample_row["id"]
+
+ return ({
+ "phenotype_id": dataidmap[pheno_name2id[phenoname]]["phenotype_id"],
+ "data_id": dataidmap[pheno_name2id[phenoname]]["data_id"],
+ "sample_name": samplename,
+ "sample_id": samples[samplename]["Id"],
+ "value": phenovalue
+ } for phenoname, phenovalue in sample_row.items() if phenoname != "id")
+
+
+def __build_dataitems__(
+ phenofiles,
+ control_data,
+ samples,
+ dataidmap,
+ pheno_name2id
+):
+ _headers = rqtl2.read_csv_file_headers(
+ phenofiles[0],
+ False, # Any transposed files have been un-transposed by this point
+ control_data["sep"],
+ control_data["comment.char"])
+ _filescontents = (
+ rqtl2.read_csv_file(path,
+ separator=control_data["sep"],
+ comment_char=control_data["comment.char"])
+ for path in phenofiles)
+ _linescontents = (
+ __row_to_dataitems__(
+ dict(zip(("id",) + _headers[1:],
+ __replace_na_strings__(line, control_data["na.strings"]))),
+ dataidmap,
+ pheno_name2id,
+ samples)
+ for linenum, line in (enumline for filecontent in _filescontents
+ for enumline in enumerate(filecontent))
+ if linenum > 0)
+ return (item for items in _linescontents
+ for item in items
+ if item["value"] is not None)
+
+
+def save_numeric_data(# pylint: disable=[too-many-positional-arguments,too-many-arguments]
+ conn: mysqldb.Connection,
+ dataidmap: dict,
+ pheno_name2id: dict[str, int],
+ samples: tuple[dict, ...],
+ control_data: dict,
+ filesdir: Path,
+ filetype: str,
+ table: str
+):
+ """Read data from files and save to the database."""
+ phenofiles = tuple(
+ filesdir.joinpath(_file) for _file in control_data[filetype])
+ if len(phenofiles) <= 0:
+ return tuple()
+
+ if control_data[f"{filetype}_transposed"]:
+ logger.info("Undoing transposition of the files rows and columns.")
+ phenofiles = tuple(
+ rqtl2.transpose_csv_with_rename(
+ _file,
+ build_line_splitter(control_data),
+ build_line_joiner(control_data))
+ for _file in phenofiles)
+
+ try:
+ logger.debug("Attempt quick save with `LOAD … INFILE`.")
+ return quick_save_phenotypes_data(
+ conn,
+ table,
+ __build_dataitems__(
+ phenofiles,
+ control_data,
+ samples,
+ dataidmap,
+ pheno_name2id),
+ filesdir)
+ except Exception as _exc:# pylint: disable=[broad-exception-caught]
+ logger.debug("Could not use `LOAD … INFILE`, using raw query",
+ exc_info=True)
+ time.sleep(60)
+ return save_phenotypes_data(
+ conn,
+ table,
+ __build_dataitems__(
+ phenofiles,
+ control_data,
+ samples,
+ dataidmap,
+ pheno_name2id))
+
+
+save_pheno_data = partial(save_numeric_data,
+ filetype="pheno",
+ table="PublishData")
+
+
+save_phenotypes_se = partial(save_numeric_data,
+ filetype="phenose",
+ table="PublishSE")
+
+
+save_phenotypes_n = partial(save_numeric_data,
+ filetype="phenonum",
+ table="NStrain")
+
+
+def cross_reference_phenotypes_publications_and_data(
+ conn: mysqldb.Connection, xref_data: tuple[dict, ...]
+):
+ """Crossreference the phenotypes, publication and data."""
+ with conn.cursor(cursorclass=DictCursor) as cursor:
+ cursor.execute("SELECT MAX(Id) CurrentMaxId FROM PublishXRef")
+ _nextid = int(cursor.fetchone()["CurrentMaxId"]) + 1
+ _params = tuple({**row, "xref_id": _id}
+ for _id, row in enumerate(xref_data, start=_nextid))
+ cursor.executemany(
+ ("INSERT INTO PublishXRef("
+ "Id, InbredSetId, PhenotypeId, PublicationId, DataId, comments"
+ ") "
+ "VALUES ("
+ "%(xref_id)s, %(population_id)s, %(phenotype_id)s, "
+ "%(publication_id)s, %(data_id)s, 'Upload of new data.'"
+ ")"),
+ _params)
+ cursor.executemany(
+ "UPDATE PublishXRef SET mean="
+ "(SELECT AVG(value) FROM PublishData WHERE PublishData.Id=PublishXRef.DataId) "
+ "WHERE PublishXRef.Id=%(xref_id)s AND "
+ "InbredSetId=%(population_id)s",
+ _params)
+ return _params
+ return tuple()
+
+
+def update_auth(# pylint: disable=[too-many-locals,too-many-positional-arguments,too-many-arguments]
+ authserver,
+ token,
+ species,
+ population,
+ dataset,
+ xrefdata):
+ """Grant the user access to their data."""
+ _tries = 0
+ _delay = 1
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json"
+ }
+ def authserveruri(endpoint):
+ return urljoin(authserver, endpoint)
+
+ def __fetch_user_details__():
+ logger.debug("… Fetching user details")
+ return mrequests.get(
+ authserveruri("/auth/user/"),
+ headers=headers
+ )
+
+ def __link_data__(user):
+ logger.debug("… linking uploaded data to user's group")
+ return mrequests.post(
+ authserveruri("/auth/data/link/phenotype"),
+ headers=headers,
+ json={
+ "species_name": species["Name"],
+ "group_id": user["group"]["group_id"],
+ "selected": [
+ {
+ "SpeciesId": species["SpeciesId"],
+ "InbredSetId": population["Id"],
+ "PublishFreezeId": dataset["Id"],
+ "dataset_name": dataset["Name"],
+ "dataset_fullname": dataset["FullName"],
+ "dataset_shortname": dataset["ShortName"],
+ "PublishXRefId": item["xref_id"]
+ }
+ for item in xrefdata
+ ],
+ "using-raw-ids": "on"
+ }).then(lambda ld_results: (user, ld_results))
+
+ def __fetch_phenotype_category_details__(user, linkeddata):
+ logger.debug("… fetching phenotype category details")
+ return mrequests.get(
+ authserveruri("/auth/resource/categories"),
+ headers=headers
+ ).then(
+ lambda categories: (
+ user,
+ linkeddata,
+ next(category for category in categories
+ if category["resource_category_key"] == "phenotype"))
+ )
+
+ def __create_resource__(user, linkeddata, category):
+ logger.debug("… creating authorisation resource object")
+ now = datetime.datetime.now().isoformat()
+ return mrequests.post(
+ authserveruri("/auth/resource/create"),
+ headers=headers,
+ json={
+ "resource_category": category["resource_category_id"],
+ "resource_name": (f"{user['email']}—{dataset['Name']}—{now}—"
+ f"{len(xrefdata)} phenotypes"),
+ "public": "off"
+ }).then(lambda cr_results: (user, linkeddata, cr_results))
+
+ def __attach_data_to_resource__(user, linkeddata, resource):
+ logger.debug("… attaching data to authorisation resource object")
+ return mrequests.post(
+ authserveruri("/auth/resource/data/link"),
+ headers=headers,
+ json={
+ "dataset_type": "phenotype",
+ "resource_id": resource["resource_id"],
+ "data_link_ids": [
+ item["data_link_id"] for item in linkeddata["traits"]]
+ }).then(lambda attc: (user, linkeddata, resource, attc))
+
+ def __handle_error__(resp):
+ error = resp.json()
+ if error.get("error") == "IntegrityError":
+ # This is hacky. If the auth already exists, something went wrong
+ # somewhere.
+ # This needs investigation to recover correctly.
+ logger.info(
+ "The authorisation for the data was already set up.")
+ return 0
+ logger.error("ERROR: Updating the authorisation for the data failed.")
+ logger.debug(
+ "ERROR: The response from the authorisation server was:\n\t%s",
+ error)
+ return 1
+
+ def __handle_success__(_val):
+ logger.info(
+ "The authorisation for the data has been updated successfully.")
+ return 0
+
+ return __fetch_user_details__().then(__link_data__).then(
+ lambda result: __fetch_phenotype_category_details__(*result)
+ ).then(
+ lambda result: __create_resource__(*result)
+ ).then(
+ lambda result: __attach_data_to_resource__(*result)
+ ).either(__handle_error__, __handle_success__)
+
+
+def load_data(conn: mysqldb.Connection, job: dict) -> int:#pylint: disable=[too-many-locals]
+ """Load the data attached in the given job."""
+ _job_metadata = job["metadata"]
+ # Steps
+ # 0. Read data from the files: can be multiple files per type
+ #
+ _species = species_by_id(conn, int(_job_metadata["species_id"]))
+ _population = population_by_species_and_id(
+ conn,
+ _species["SpeciesId"],
+ int(_job_metadata["population_id"]))
+ _dataset = dataset_by_id(
+ conn,
+ _species["SpeciesId"],
+ _population["Id"],
+ int(_job_metadata["dataset_id"]))
+ # 1. Just retrive the publication: Don't create publications for now.
+ _publication = fetch_publication_by_id(
+ conn, int(_job_metadata.get("publication_id", "0"))) or {"Id": 0}
+ # 2. Save all new phenotypes:
+ # -> return phenotype IDs
+ bundle = Path(_job_metadata["bundle_file"])
+ _control_data = rqtl2.control_data(bundle)
+ logger.info("Extracting the zipped bundle of files.")
+ _outdir = Path(bundle.parent, f"bundle_{bundle.stem}")
+ with ZipFile(str(bundle), "r") as zfile:
+ _files = rqtl2.extract(zfile, _outdir)
+ logger.info("Saving new phenotypes.")
+ _phenos = save_phenotypes(conn, _control_data, _outdir)
+ def __build_phenos_maps__(accumulator, current):
+ dataid, row = current
+ return ({
+ **accumulator[0],
+ row["phenotype_id"]: {
+ "population_id": _population["Id"],
+ "phenotype_id": row["phenotype_id"],
+ "data_id": dataid,
+ "publication_id": _publication["Id"],
+ }
+ }, {
+ **accumulator[1],
+ row["id"]: row["phenotype_id"]
+ })
+ dataidmap, pheno_name2id = reduce(
+ __build_phenos_maps__,
+ enumerate(_phenos, start=__fetch_next_dataid__(conn)),
+ ({},{}))
+ # 3. a. Fetch the strain names and IDS: create name->ID map
+ samples = {
+ row["Name"]: row
+ for row in samples_by_species_and_population(
+ conn, _species["SpeciesId"], _population["Id"])}
+ # b. Save all the data items (DataIds are vibes), return new IDs
+ logger.info("Saving new phenotypes data.")
+ _num_data_rows = save_pheno_data(conn=conn,
+ dataidmap=dataidmap,
+ pheno_name2id=pheno_name2id,
+ samples=samples,
+ control_data=_control_data,
+ filesdir=_outdir)
+ logger.info("Saved %s new phenotype data rows.", _num_data_rows)
+ # 4. Cross-reference Phenotype, Publication, and PublishData in PublishXRef
+ logger.info("Cross-referencing new phenotypes to their data and publications.")
+ _xrefs = cross_reference_phenotypes_publications_and_data(
+ conn, tuple(dataidmap.values()))
+ # 5. If standard errors and N exist, save them too
+ # (use IDs returned in `3. b.` above).
+ if _control_data.get("phenose"):
+ logger.info("Saving new phenotypes standard errors.")
+ _num_se_rows = save_phenotypes_se(conn=conn,
+ dataidmap=dataidmap,
+ pheno_name2id=pheno_name2id,
+ samples=samples,
+ control_data=_control_data,
+ filesdir=_outdir)
+ logger.info("Saved %s new phenotype standard error rows.", _num_se_rows)
+
+ if _control_data.get("phenonum"):
+ logger.info("Saving new phenotypes sample counts.")
+ _num_n_rows = save_phenotypes_n(conn=conn,
+ dataidmap=dataidmap,
+ pheno_name2id=pheno_name2id,
+ samples=samples,
+ control_data=_control_data,
+ filesdir=_outdir)
+ logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows)
+
+ return (_species, _population, _dataset, _xrefs)
+
+
+if __name__ == "__main__":
+ def parse_args():
+ """Setup command-line arguments."""
+ parser = argparse.ArgumentParser(
+ prog="load_phenotypes_to_db",
+ description="Process the phenotypes' data and load it into the database.")
+ parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL")
+ parser.add_argument(
+ "jobs_db_path", type=Path, help="Path to jobs' SQLite database.")
+ parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job")
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
+ return parser.parse_args()
+
+ def setup_logging(log_level: str):
+ """Setup logging for the script."""
+ logger.setLevel(log_level)
+ logging.getLogger("uploader.phenotypes.models").setLevel(log_level)
+
+
+ def main():
+ """Entry-point for this script."""
+ args = parse_args()
+ setup_logging(args.log_level.upper())
+
+ with (mysqldb.database_connection(args.db_uri) as conn,
+ conn.cursor(cursorclass=DictCursor) as cursor,
+ sqlite3.connection(args.jobs_db_path) as jobs_conn):
+ job = jobs.job(jobs_conn, args.job_id)
+
+ # Lock the PublishXRef/PublishData/PublishSE/NStrain here: Why?
+ # The `DataId` values are sequential, but not auto-increment
+ # Can't convert `PublishXRef`.`DataId` to AUTO_INCREMENT.
+ # `SELECT MAX(DataId) FROM PublishXRef;`
+ # How do you check for a table lock?
+ # https://oracle-base.com/articles/mysql/mysql-identify-locked-tables
+ # `SHOW OPEN TABLES LIKE 'Publish%';`
+ _db_tables_ = (
+ "Species",
+ "InbredSet",
+ "Strain",
+ "StrainXRef",
+ "Publication",
+ "Phenotype",
+ "PublishXRef",
+ "PublishFreeze",
+ "PublishData",
+ "PublishSE",
+ "NStrain")
+
+ logger.debug(
+ ("Locking database tables for the connection:" +
+ "".join("\n\t- %s" for _ in _db_tables_) + "\n"),
+ *_db_tables_)
+ cursor.execute(# Lock the tables to avoid race conditions
+ "LOCK TABLES " + ", ".join(
+ f"{_table} WRITE" for _table in _db_tables_))
+
+ db_results = load_data(conn, job)
+ jobs.update_metadata(
+ jobs_conn,
+ args.job_id,
+ "xref_ids",
+ json.dumps([xref["xref_id"] for xref in db_results[3]]))
+
+ logger.info("Unlocking all database tables.")
+ cursor.execute("UNLOCK TABLES")
+
+ # Update authorisations (break this down) — maybe loop until it works?
+ logger.info("Updating authorisation.")
+ _job_metadata = job["metadata"]
+ return update_auth(_job_metadata["authserver"],
+ _job_metadata["token"],
+ *db_results)
+
+
+ try:
+ sys.exit(main())
+ except Exception as _exc:# pylint: disable=[broad-exception-caught]
+ logger.debug("Data loading failed… Halting!",
+ exc_info=True)
+ sys.exit(1)
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 8b7a0fb..e2ce420 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -104,7 +104,7 @@ def process_bundle(dbconn: mdb.Connection,
rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
logger)
if genoexit != 0:
- raise Exception("Processing 'geno' file failed.")
+ raise Exception("Processing 'geno' file failed.")# pylint: disable=[broad-exception-raised]
logger.debug(
"geno file processing completed successfully. (ExitCode: %s)",
genoexit)
@@ -122,7 +122,7 @@ def process_bundle(dbconn: mdb.Connection,
rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
logger)
if phenoexit != 0:
- raise Exception("Processing 'pheno' file failed.")
+ raise Exception("Processing 'pheno' file failed.")# pylint: disable=[broad-exception-raised]
logger.debug(
"pheno file processing completed successfully. (ExitCode: %s)",
phenoexit)
diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py
index 9f9248c..0207938 100644
--- a/scripts/qc_on_rqtl2_bundle.py
+++ b/scripts/qc_on_rqtl2_bundle.py
@@ -191,7 +191,7 @@ def check_pheno_samples(
return allerrors
-def qc_pheno_errors(# pylint: disable=[too-many-arguments]
+def qc_pheno_errors(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool:
"""Check for errors in `pheno` file(s)."""
cdata = rqtl2.control_data(zfile)
@@ -260,7 +260,7 @@ def run_qc(rconn: Redis,
if qc_missing_files(rconn, fqjobid, zfile, logger):
return 1
- def with_zipfile(# pylint: disable=[too-many-arguments]
+ def with_zipfile(# pylint: disable=[too-many-arguments,too-many-positional-arguments]
rconn, fqjobid, dbconn, speciesid, filename, logger, func
):
with ZipFile(filename, "r") as zfile:
diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py
index d3fde5f..a74e5e4 100644
--- a/scripts/redis_logger.py
+++ b/scripts/redis_logger.py
@@ -6,7 +6,7 @@ from redis import Redis
class RedisLogger(logging.Handler):
"""Log out to redis for our worker scripts"""
- def __init__(self,#pylint: disable=[too-many-arguments]
+ def __init__(self,#pylint: disable=[too-many-arguments, too-many-positional-arguments]
rconn: Redis,
fullyqualifiedjobid: str,
messageslistname: str,
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index 327ed2c..e0e00e7 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -20,27 +20,23 @@ def build_main(
[Redis, Connection, str, Namespace, logging.Logger],
int
],
- loggername: str
+ logger: logging.Logger
) -> Callable[[],int]:
"""Build a function to be used as an entry-point for scripts."""
def main():
- try:
- logging.basicConfig(
- format=(
- "%(asctime)s - %(levelname)s %(name)s: "
- "(%(pathname)s: %(lineno)d) %(message)s"),
- level=args.loglevel)
- logger = logging.getLogger(loggername)
- with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
- database_connection(args.databaseuri) as dbconn):
- fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+ with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
+ database_connection(args.databaseuri) as dbconn):
+ logger.setLevel(args.loglevel.upper())
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+
+ try:
rconn.hset(fqjobid, "status", "started")
logger.addHandler(setup_redis_logger(
rconn,
fqjobid,
f"{fqjobid}:log-messages",
args.redisexpiry))
- logger.addHandler(StreamHandler(stream=sys.stdout))
+ logger.addHandler(StreamHandler(stream=sys.stderr))
check_db(args.databaseuri)
check_redis(args.redisuri)
@@ -48,15 +44,15 @@ def build_main(
logger.error("File not found: '%s'.", args.rqtl2bundle)
return 2
- returncode = run_fn(rconn, dbconn, fqjobid, args, logger)
+ returncode = run_fn(rconn, dbconn, fqjobid, args)
if returncode == 0:
rconn.hset(fqjobid, "status", "completed:success")
return returncode
rconn.hset(fqjobid, "status", "completed:error")
return returncode
- except Exception as _exc:# pylint: disable=[broad-except]
- logger.error("The process failed!", exc_info=True)
- rconn.hset(fqjobid, "status", "completed:error")
- return 4
+ except Exception as _exc:# pylint: disable=[broad-except]
+ logger.error("The process failed!", exc_info=True)
+ rconn.hset(fqjobid, "status", "completed:error")
+ return 4
return main
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
index 76ecb8d..9f11f57 100644
--- a/scripts/rqtl2/phenotypes_qc.py
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -36,8 +36,15 @@ from scripts.cli_parser import init_cli_parser, add_global_data_arguments
from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+logging.basicConfig(
+ format=("%(asctime)s - %(levelname)s %(name)s: "
+ "(%(pathname)s: %(lineno)d) %(message)s"))
+logger = logging.getLogger(__MODULE__)
-def validate(phenobundle: Path, logger: Logger) -> dict:
+def validate(
+ phenobundle: Path,
+ logger: Logger# pylint: disable=[redefined-outer-name]
+) -> dict:
"""Check that the bundle is generally valid"""
try:
rqc.validate_bundle(phenobundle)
@@ -59,7 +66,7 @@ def validate(phenobundle: Path, logger: Logger) -> dict:
def check_for_mandatory_pheno_keys(
phenobundle: Path,
- logger: Logger,
+ logger: Logger,# pylint: disable=[redefined-outer-name]
**kwargs
) -> dict:
"""Check that the mandatory keys exist for phenotypes."""
@@ -86,7 +93,7 @@ def check_for_mandatory_pheno_keys(
def check_for_averages_files(
phenobundle: Path,
- logger: Logger,
+ logger: Logger,# pylint: disable=[redefined-outer-name]
**kwargs
) -> dict:
"""Check that averages files appear together"""
@@ -140,15 +147,15 @@ def redis_logger(
) -> Iterator[logging.Logger]:
"""Build a Redis message-list logger."""
rconn = Redis.from_url(redisuri, decode_responses=True)
- logger = logging.getLogger(loggername)
- logger.propagate = False
+ _logger = logging.getLogger(loggername)
+ _logger.propagate = False
handler = RedisMessageListHandler(
rconn,
fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
handler.setFormatter(logging.getLogger().handlers[0].formatter)
- logger.addHandler(handler)
+ _logger.addHandler(handler)
try:
- yield logger
+ yield _logger
finally:
rconn.close()
@@ -175,9 +182,9 @@ def qc_phenocovar_file(
redisuri,
f"{__MODULE__}.qc_phenocovar_file",
filepath.name,
- f"{fqkey}:logs") as logger,
+ f"{fqkey}:logs") as _logger,
Redis.from_url(redisuri, decode_responses=True) as rconn):
- logger.info("Running QC on file: %s", filepath.name)
+ print("Running QC on file: ", filepath.name)
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
_headings = tuple(heading.lower() for heading in next(_csvfile))
_errors: tuple[InvalidValue, ...] = tuple()
@@ -195,7 +202,7 @@ def qc_phenocovar_file(
def collect_errors(errors_and_linecount, line):
_errs, _lc = errors_and_linecount
- logger.info("Testing record '%s'", line[0])
+ _logger.info("Testing record '%s'", line[0])
if len(line) != len(_headings):
_errs = _errs + (save_error(InvalidValue(
filepath.name,
@@ -205,12 +212,12 @@ def qc_phenocovar_file(
(f"Record {_lc} in file {filepath.name} has a different "
"number of columns than the number of headings"))),)
_line = dict(zip(_headings, line))
- if not bool(_line["description"]):
+ if not bool(_line.get("description")):
_errs = _errs + (
save_error(InvalidValue(filepath.name,
_line[_headings[0]],
"description",
- _line["description"],
+ _line.get("description"),
"The description is not provided!")),)
rconn.hset(file_fqkey(fqkey, "metadata", filepath),
@@ -236,7 +243,7 @@ def merge_dicts(*dicts):
return reduce(lambda merged, dct: {**merged, **dct}, dicts, {})
-def decimal_points_error(# pylint: disable=[too-many-arguments]
+def decimal_points_error(# pylint: disable=[too-many-arguments,too-many-positional-arguments]
filename: str,
rowtitle: str,
coltitle: str,
@@ -267,7 +274,7 @@ def integer_error(
return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
-def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments]
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments]
filepath: Path,
redisuri: str,
fqkey: str,
@@ -283,9 +290,9 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments]
redisuri,
f"{__MODULE__}.qc_pheno_file",
filepath.name,
- f"{fqkey}:logs") as logger,
+ f"{fqkey}:logs") as _logger,
Redis.from_url(redisuri, decode_responses=True) as rconn):
- logger.info("Running QC on file: %s", filepath.name)
+ print("Running QC on file: ", filepath.name)
save_error = partial(
push_error, rconn, file_fqkey(fqkey, "errors", filepath))
_csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
@@ -310,7 +317,7 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments]
def collect_errors(errors_and_linecount, line):
_errs, _lc = errors_and_linecount
- logger.debug("Checking row %s", line[0])
+ _logger.debug("Checking row %s", line[0])
if line[0] not in samples:
_errs = _errs + (save_error(InvalidValue(
filepath.name,
@@ -369,11 +376,10 @@ def run_qc(# pylint: disable=[too-many-locals]
rconn: Redis,
dbconn: mdb.Connection,
fullyqualifiedjobid: str,
- args: Namespace,
- logger: Logger
+ args: Namespace
) -> int:
"""Run quality control checks on the bundle."""
- logger.debug("Beginning the quality assurance checks.")
+ print("Beginning the quality assurance checks.")
results = check_for_averages_files(
**check_for_mandatory_pheno_keys(
**validate(args.rqtl2bundle, logger)))
@@ -398,7 +404,7 @@ def run_qc(# pylint: disable=[too-many-locals]
for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
# - Fetch samples/individuals from database.
- logger.debug("Fetching samples/individuals from the database.")
+ print("Fetching samples/individuals from the database.")
samples = tuple(#type: ignore[var-annotated]
item for item in set(reduce(
lambda acc, item: acc + (
@@ -415,7 +421,7 @@ def run_qc(# pylint: disable=[too-many-locals]
json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
for _file in cdata.get("phenocovar", []))))
with mproc.Pool(mproc.cpu_count() - 1) as pool:
- logger.debug("Check for errors in 'phenocovar' file(s).")
+ print("Check for errors in 'phenocovar' file(s).")
_phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
(extractiondir.joinpath(_file),
args.redisuri,
@@ -437,7 +443,7 @@ def run_qc(# pylint: disable=[too-many-locals]
"Expected a non-negative number with at least one decimal "
"place."))
- logger.debug("Check for errors in 'pheno' file(s).")
+ print("Check for errors in 'pheno' file(s).")
_pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
extractiondir.joinpath(_file),
args.redisuri,
@@ -456,7 +462,7 @@ def run_qc(# pylint: disable=[too-many-locals]
# - Check the 3 checks above for phenose and phenonum values too
# qc_phenose_files(…)
# qc_phenonum_files(…)
- logger.debug("Check for errors in 'phenose' file(s).")
+ print("Check for errors in 'phenose' file(s).")
_phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
extractiondir.joinpath(_file),
args.redisuri,
@@ -472,7 +478,7 @@ def run_qc(# pylint: disable=[too-many-locals]
dec_err_fn
) for _file in cdata.get("phenose", []))))
- logger.debug("Check for errors in 'phenonum' file(s).")
+ print("Check for errors in 'phenonum' file(s).")
_phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
extractiondir.joinpath(_file),
args.redisuri,
@@ -509,5 +515,5 @@ if __name__ == "__main__":
type=Path)
return parser.parse_args()
- main = build_main(cli_args(), run_qc, __MODULE__)
+ main = build_main(cli_args(), run_qc, logger)
sys.exit(main())
diff --git a/scripts/worker.py b/scripts/worker.py
index 91b0332..3165fe7 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -79,7 +79,7 @@ def main():
fqjobid = jobs.job_key(args.redisprefix, args.jobid)
rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id),
- time=timedelta(seconds=(2 * 60 * 60)))
+ time=timedelta(seconds=2 * 60 * 60))
print(f"No such job. '{args.job_id}'.", file=sys.stderr)
return 2
return 3