"""Insert samples into the database.""" import sys import logging import pathlib import argparse import MySQLdb as mdb from redis import Redis from gn_libs.mysqldb import database_connection from uploader.check_connections import check_db, check_redis from uploader.species.models import species_by_id from uploader.population.models import population_by_id from uploader.samples.models import ( save_samples_data, read_samples_file, cross_reference_samples) stderr_handler = logging.StreamHandler(stream=sys.stderr) root_logger = logging.getLogger() root_logger.addHandler(stderr_handler) root_logger.setLevel("INFO") class SeparatorAction(argparse.Action): """Action to handle the separator values.""" def __init__(self, option_strings, dest, nargs=None, **kwargs): """Init the action""" if nargs is not None: raise ValueError("nargs not allowed.") super().__init__(option_strings, dest, nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): """Process the value passed in.""" setattr(namespace, self.dest, (chr(9) if values == "\\t" else values)) def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] rconn: Redis,# pylint: disable=[unused-argument] speciesid: int, populationid: int, samplesfile: pathlib.Path, separator: str, firstlineheading: bool, quotechar: str): """Insert the samples into the database.""" print("Checking for errors:") species = species_by_id(conn, speciesid) if not bool(species): logging.error("Species with id '%s' does not exist.", str(speciesid)) return 1 print(f"\tSpecies with ID '{speciesid}' found") population = population_by_id(conn, populationid) if not bool(population): logging.error("Population with id '%s' does not exist.", str(populationid)) return 1 print(f"\tPopulations with ID '{populationid}' found") print("No errors found. Continuing...") print("\nInserting samples ...") save_samples_data( conn, speciesid, read_samples_file(samplesfile, separator, firstlineheading)) print("Cross-referencing samples with their populations.") cross_reference_samples( conn, speciesid, populationid, (row["Name"] for row in read_samples_file(samplesfile, separator, firstlineheading, quotechar=quotechar))) print("Samples upload successfully completed.") return 0 if __name__ == "__main__": def cli_args(): """Process the command-line arguments.""" # parser = argparse.ArgumentParser( prog="insert_samples", description = ( "Script to parse and insert sample data from a file into the " "database.")) # == Mandatory Arguments == parser.add_argument( "databaseuri", help="URL to be used to initialise the connection to the database") parser.add_argument("speciesid", type=int, help="The species identifier in the database.") parser.add_argument( "populationid", type=int, help="The grouping/population identifier in the database.") parser.add_argument( "samplesfile", type=pathlib.Path, help="Path to the CSV file containing the samples data.") parser.add_argument( "separator", action=SeparatorAction, help="The 'character' in the CSV file that separates the fields.", default=chr(9)) # == Optional Arguments == parser.add_argument( "--firstlineheading", action="store_true", help=("If the first line of the file is a header row, invoke the " "program with this flag.")) parser.add_argument( "--quotechar", default='"', help=("The character used to delimit (surround?) the value in " "each column.")) # == Script-specific extras == parser.add_argument("--redisuri", help="URL to initialise connection to redis", default="redis:///") args = parser.parse_args() return args def main(): """Run script to insert samples into the database.""" args = cli_args() check_db(args.databaseuri) check_redis(args.redisuri) if not args.samplesfile.exists(): logging.error("File not found: '%s'.", args.samplesfile) return 2 with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): return insert_samples(dbconn, rconn, args.speciesid, args.populationid, args.samplesfile, args.separator, args.firstlineheading, args.quotechar) sys.exit(main())