"""Insert samples into the database."""
import sys
import logging
import pathlib
import argparse
import traceback
import MySQLdb as mdb
from redis import Redis
from gn_libs.mysqldb import database_connection
from uploader.check_connections import check_db, check_redis
from uploader.species.models import species_by_id
from uploader.population.models import population_by_id
from uploader.samples.models import (
save_samples_data,
read_samples_file,
cross_reference_samples)
stderr_handler = logging.StreamHandler(stream=sys.stderr)
root_logger = logging.getLogger()
root_logger.addHandler(stderr_handler)
root_logger.setLevel("INFO")
class SeparatorAction(argparse.Action):
"""Action to handle the separator values."""
def __init__(self, option_strings, dest, nargs=None, **kwargs):
"""Init the action"""
if nargs is not None:
raise ValueError("nargs not allowed.")
super().__init__(option_strings, dest, nargs, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
"""Process the value passed in."""
setattr(namespace, self.dest, (chr(9) if values == "\\t" else values))
def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments, too-many-positional-arguments]
rconn: Redis,# pylint: disable=[unused-argument]
speciesid: int,
populationid: int,
samplesfile: pathlib.Path,
separator: str,
firstlineheading: bool,
quotechar: str):
"""Insert the samples into the database."""
print("Checking for errors:")
species = species_by_id(conn, speciesid)
if not bool(species):
logging.error("Species with id '%s' does not exist.", str(speciesid))
return 1
print(f"\tSpecies with ID '{speciesid}' found")
population = population_by_id(conn, populationid)
if not bool(population):
logging.error("Population with id '%s' does not exist.",
str(populationid))
return 1
print(f"\tPopulations with ID '{populationid}' found")
print("No errors found. Continuing...")
print("\nInserting samples ...")
save_samples_data(
conn,
speciesid,
read_samples_file(samplesfile, separator, firstlineheading))
print("Cross-referencing samples with their populations.")
cross_reference_samples(
conn,
speciesid,
populationid,
(row["Name"] for row in
read_samples_file(samplesfile,
separator,
firstlineheading,
quotechar=quotechar)))
print("Samples upload successfully completed.")
return 0
if __name__ == "__main__":
def cli_args():
"""Process the command-line arguments."""
#
parser = argparse.ArgumentParser(
prog="insert_samples",
description = (
"Script to parse and insert sample data from a file into the "
"database."))
# == Mandatory Arguments ==
parser.add_argument(
"databaseuri",
help="URL to be used to initialise the connection to the database")
parser.add_argument("speciesid",
type=int,
help="The species identifier in the database.")
parser.add_argument(
"populationid",
type=int,
help="The grouping/population identifier in the database.")
parser.add_argument(
"samplesfile",
type=pathlib.Path,
help="Path to the CSV file containing the samples data.")
parser.add_argument(
"separator",
action=SeparatorAction,
help="The 'character' in the CSV file that separates the fields.",
default=chr(9))
# == Optional Arguments ==
parser.add_argument(
"--firstlineheading",
action="store_true",
help=("If the first line of the file is a header row, invoke the "
"program with this flag."))
parser.add_argument(
"--quotechar",
default='"',
help=("The character used to delimit (surround?) the value in "
"each column."))
# == Script-specific extras ==
parser.add_argument("--redisuri",
help="URL to initialise connection to redis",
default="redis:///")
args = parser.parse_args()
return args
def main():
"""Run script to insert samples into the database."""
status_code = 1 # Exit with an Exception
args = cli_args()
check_db(args.databaseuri)
check_redis(args.redisuri)
if not args.samplesfile.exists():
logging.error("File not found: '%s'.", args.samplesfile)
return 2
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
try:
status_code = insert_samples(dbconn,
rconn,
args.speciesid,
args.populationid,
args.samplesfile,
args.separator,
args.firstlineheading,
args.quotechar)
except Exception as _exc:# pylint: disable=[broad-exception-caught]
print(traceback.format_exc(), file=sys.stderr)
return status_code
sys.exit(main())