aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--quality_control/file_utils.py13
-rw-r--r--quality_control/parsing.py11
-rw-r--r--scripts/insert_data.py146
3 files changed, 161 insertions, 9 deletions
diff --git a/quality_control/file_utils.py b/quality_control/file_utils.py
new file mode 100644
index 0000000..fdce1e1
--- /dev/null
+++ b/quality_control/file_utils.py
@@ -0,0 +1,13 @@
+"Common file utilities"
+from typing import Union
+from pathlib import Path
+from io import TextIOWrapper
+from zipfile import ZipFile, is_zipfile
+
+def open_file(filepath: Union[str, Path]) -> Union[ZipFile, TextIOWrapper]:
+ "Transparently open both TSV and ZIP files"
+ if not is_zipfile(filepath):
+ return open(filepath, encoding="utf-8")
+
+ with ZipFile(filepath, "r") as zfile:
+ return zfile.open(zfile.infolist()[0], "r")
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index 28a311e..5fc5f62 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -2,10 +2,10 @@
import collections
from enum import Enum
from functools import partial
-from zipfile import ZipFile, is_zipfile
from typing import Tuple, Union, Iterable, Generator, Callable, Optional
import quality_control.average as avg
+from quality_control.file_utils import open_file
import quality_control.standard_error as se
from quality_control.errors import (
InvalidValue, DuplicateHeading, InconsistentColumns)
@@ -92,14 +92,7 @@ def collect_errors(
return errors + tuple(error for error in errs if error is not None)
return errors + (errs,)
- def __open_file__(filepath):
- if not is_zipfile(filepath):
- return open(filepath, encoding="utf-8")
-
- with ZipFile(filepath, "r") as zfile:
- return zfile.open(zfile.infolist()[0], "r")
-
- with __open_file__(filepath) as input_file:
+ with open_file(filepath) as input_file:
for line_number, line in enumerate(input_file, start=1):
if user_aborted():
break
diff --git a/scripts/insert_data.py b/scripts/insert_data.py
new file mode 100644
index 0000000..5e596ff
--- /dev/null
+++ b/scripts/insert_data.py
@@ -0,0 +1,146 @@
+"""Insert means/averages or standard-error data into the database."""
+import sys
+import argparse
+from typing import Tuple
+
+import MySQLdb as mdb
+from redis import Redis
+from MySQLdb.cursors import DictCursor
+
+from quality_control.parsing import take
+from qc_app.db_utils import database_connection
+from quality_control.file_utils import open_file
+from qc_app.check_connections import check_db, check_redis
+
+def translate_alias(heading):
+ translations = {"B6": "C57BL/6J", "D2": "DBA/2J"}
+ return translations.get(heading, heading)
+
+def read_file_headings(filepath):
+ "Get the file headings"
+ with open_file(filepath) as input_file:
+ for line_number, line_contents in enumerate(input_file):
+ if line_number == 0:
+ return tuple(
+ translate_alias(heading.strip())
+ for heading in line_contents.split("\t"))
+
+def read_file_contents(filepath):
+ "Get the file contents"
+ with open_file(filepath) as input_file:
+ for line_number, line_contents in enumerate(input_file):
+ if line_number == 0:
+ continue
+ if line_number > 0:
+ yield tuple(
+ field.strip() for field in line_contents.split("\t"))
+
+def strains_info(dbconn: mdb.Connection, strain_names: Tuple[str, ...]) -> dict:
+ "Retrieve information for the strains"
+ with dbconn.cursor(cursorclass=DictCursor) as cursor:
+ query = (
+ "SELECT * FROM Strain WHERE Name IN "
+ f"({', '.join(['%s']*len(strain_names))})")
+ cursor.execute(query, tuple(strain_names))
+ return {strain["Name"]: strain for strain in cursor.fetchall()}
+
+def read_means(filepath, headings, strain_info):
+ for row in (
+ dict(zip(headings, line))
+ for line in read_file_contents(filepath)):
+ for sname in headings[1:]:
+ yield {
+ "ProbeSetId": int(row["ProbeSetID"]),
+ "StrainId": strain_info[sname]["Id"],
+ "ProbeSetDataValue": float(row[sname])
+ }
+
+def last_data_id(dbconn: mdb.Connection) -> int:
+ "Get the last id from the database"
+ with dbconn.cursor() as cursor:
+ cursor.execute("SELECT MAX(Id) FROM ProbeSetData")
+ return int(cursor.fetchone()[0])
+
+def insert_means(
+ filepath: str, dataset_id: int, dbconn: mdb.Connection,
+ rconn: Redis) -> int:
+ "Insert the means/averages data into the database"
+ print("INSERTING MEANS/AVERAGES DATA.")
+ headings = read_file_headings(filepath)
+ strains = strains_info(dbconn, headings[1:])
+ means_query = (
+ "INSERT INTO ProbeSetData "
+ "VALUES(%(ProbeSetDataId)s, %(StrainId)s, %(ProbeSetDataValue)s)")
+ xref_query = (
+ "INSERT INTO ProbeSetXRef(ProbeSetFreezeId, ProbeSetId, DataId) "
+ "VALUES (%(ProbeSetFreezeId)s, %(ProbeSetId)s, %(ProbeSetDataId)s)")
+ the_means = (
+ {"ProbeSetFreezeId": dataset_id, "ProbeSetDataId": data_id, **mean}
+ for data_id, mean in
+ enumerate(
+ read_means(filepath, headings, strains),
+ start=(last_data_id(dbconn)+1)))
+ with dbconn.cursor(cursorclass=DictCursor) as cursor:
+ while True:
+ means = tuple(take(the_means, 1000))
+ if not bool(means):
+ break
+ print(
+ f"\nEXECUTING QUERIES:\n\t* {means_query}\n\t* {xref_query}\n"
+ f"with parameters\n\t{means}")
+ cursor.executemany(means_query, means)
+ cursor.executemany(xref_query, means)
+ return 0
+
+def insert_se(
+ filepath: str, dataset_id: int, dbconn: mdb.Connection,
+ rconn: Redis) -> int:
+ "Insert the standard-error data into the database"
+ print("INSERTING STANDARD ERROR DATA...")
+ return 0
+
+if __name__ == "__main__":
+ def cli_args():
+ parser = argparse.ArgumentParser(
+ prog="InsertData", description=(
+ "Script to insert data from an 'averages' file into the "
+ "database."))
+ parser.add_argument(
+ "filetype", help="type of data to insert.",
+ choices=("average", "standard-error"))
+ parser.add_argument(
+ "filepath", help="path to the file with the 'averages' data.")
+ parser.add_argument(
+ "species_id", help="Identifier for the species in the database.",
+ type=int)
+ parser.add_argument(
+ "dataset_id", help="Identifier for the dataset in the database.",
+ type=int)
+ parser.add_argument(
+ "database_uri",
+ help="URL to be used to initialise the connection to the database")
+ parser.add_argument(
+ "redisuri",
+ help="URL to initialise connection to redis",
+ default="redis:///")
+
+ args = parser.parse_args()
+ check_db(args.database_uri)
+ check_redis(args.redisuri)
+ return args
+
+ insert_fns = {
+ "average": insert_means,
+ "standard-error": insert_se
+ }
+
+ def main():
+ args = cli_args()
+ with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
+ with database_connection(args.database_uri) as dbconn:
+ return insert_fns[args.filetype](
+ args.filepath, args.dataset_id, dbconn, rconn)
+
+ return 2
+
+ sys.exit(main())