aboutsummaryrefslogtreecommitdiff
path: root/quality_control/parsing.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-02 13:04:03 +0300
committerFrederick Muriuki Muriithi2022-05-02 13:04:03 +0300
commitfcade690de59249a2789c26e8f668f36f8f4e075 (patch)
tree73a9f8d40871e7942c4ae034eabf39855b6756ea /quality_control/parsing.py
parent5632dcab27058875de99d63cbd263acfa3a9a2d5 (diff)
downloadgn-uploader-fcade690de59249a2789c26e8f668f36f8f4e075.tar.gz
Optimise strain names parsing
- Use a way faster way of parsing the strains file
Diffstat (limited to 'quality_control/parsing.py')
-rw-r--r--quality_control/parsing.py37
1 files changed, 15 insertions, 22 deletions
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index 9fe88f1..436c90c 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -16,21 +16,6 @@ class FileType(Enum):
AVERAGE = 1
STANDARD_ERROR = 2
-def parse_strains(filepath):
- """Parse the strains file"""
- with open(filepath, encoding="utf8") as strains_file:
- reader = csv.DictReader(
- strains_file,
- fieldnames=[
- header.strip() for header
- in strains_file.readline().split("\t")],
- delimiter="\t")
- for row in reader:
- yield {
- key: (value if value != "\\N" else None)
- for key, value in row.items()
- }
-
def __parse_header(line, strains):
return valid_header(
set(strains),
@@ -47,13 +32,21 @@ LINE_PARSERS = {
FileType.STANDARD_ERROR: __parse_standard_error_line
}
-def strain_names(strains):
- """Retrieve a complete list of the names of the strains"""
- def __extract_strain_names(acc, strain):
- return acc + tuple(
- item for item in (strain["Name"], strain["Name2"])
- if (item is not None and item != ""))
- return reduce(__extract_strain_names, strains, tuple())
+def strain_names(filepath):
+ """Retrieve the strains names from given file"""
+ strains = set()
+ with open(filepath, encoding="utf8") as strains_file:
+ for idx, line in enumerate(strains_file.readlines()):
+ if idx > 0:
+ parts = line.split()
+ for name in (parts[1], parts[2]):
+ strains.add(name.strip())
+ if len(parts) >= 6:
+ alias = parts[5].strip()
+ if alias != "" and alias not in ("P", "\\N"):
+ strains.add(alias)
+
+ return strains
def parse_file(filepath: str, filetype: FileType, strains: list):
"""Parse the given file"""