aboutsummaryrefslogtreecommitdiff
path: root/gn3/csvcmp.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/csvcmp.py')
-rw-r--r--gn3/csvcmp.py64
1 files changed, 36 insertions, 28 deletions
diff --git a/gn3/csvcmp.py b/gn3/csvcmp.py
index ac09cc3..82d74d0 100644
--- a/gn3/csvcmp.py
+++ b/gn3/csvcmp.py
@@ -1,50 +1,59 @@
+"""This module contains functions for manipulating and working with csv
+texts"""
import json
import os
import uuid
from gn3.commands import run_cmd
-def extract_strain_name(csv_header, data, seek="Strain Name"):
+def extract_strain_name(csv_header, data, seek="Strain Name") -> str:
+ """Extract a strain's name given a csv header"""
for column, value in zip(csv_header.split(","), data.split(",")):
if seek in column:
return value
return ""
-def create_dirs_if_not_exists(dirs: list):
+def create_dirs_if_not_exists(dirs: list) -> None:
+ """Create directories from a list"""
for dir_ in dirs:
if not os.path.exists(dir_):
os.makedirs(dir_)
def remove_insignificant_edits(diff_data, epsilon=0.001):
- _mod = []
+ """Remove or ignore edits that are not within ε"""
+ __mod = []
for mod in diff_data.get("Modifications"):
original = mod.get("Original").split(",")
current = mod.get("Current").split(",")
- for i, (x, y) in enumerate(zip(original, current)):
- if (x.replace('.', '').isdigit() and
- y.replace('.', '').isdigit() and
- abs(float(x) - float(y)) < epsilon):
- current[i] = x
+ for i, (_x, _y) in enumerate(zip(original, current)):
+ if (
+ _x.replace(".", "").isdigit()
+ and _y.replace(".", "").isdigit()
+ and abs(float(_x) - float(_y)) < epsilon
+ ):
+ current[i] = _x
if not (__o := ",".join(original)) == (__c := ",".join(current)):
- _mod.append({
- "Original": __o,
- "Current": __c,
- })
- diff_data['Modifications'] = _mod
+ __mod.append(
+ {
+ "Original": __o,
+ "Current": __c,
+ }
+ )
+ diff_data["Modifications"] = __mod
return diff_data
-def csv_diff(base_csv, delta_csv, tmp_dir="/tmp"):
+def csv_diff(base_csv, delta_csv, tmp_dir="/tmp") -> dict:
+ """Diff 2 csv strings"""
base_csv_list = base_csv.strip().split("\n")
delta_csv_list = delta_csv.strip().split("\n")
- base_csv_header, delta_csv_header, header = "", "", ""
+ base_csv_header, delta_csv_header = "", ""
for i, line in enumerate(base_csv_list):
if line.startswith("Strain Name,Value,SE,Count"):
- header = line
- base_csv_header, delta_csv_header= line, delta_csv_list[i]
+ base_csv_header, delta_csv_header = line, delta_csv_list[i]
break
longest_header = max(base_csv_header, delta_csv_header)
@@ -53,22 +62,21 @@ def csv_diff(base_csv, delta_csv, tmp_dir="/tmp"):
base_csv = base_csv.replace("Strain Name,Value,SE,Count",
longest_header, 1)
else:
- delta_csv = delta_csv.replace("Strain Name,Value,SE,Count",
- longest_header, 1)
+ delta_csv = delta_csv.replace(
+ "Strain Name,Value,SE,Count", longest_header, 1
+ )
file_name1 = os.path.join(tmp_dir, str(uuid.uuid4()))
file_name2 = os.path.join(tmp_dir, str(uuid.uuid4()))
- with open(file_name1, "w") as f_:
+ with open(file_name1, "w", encoding="utf-8") as _f:
_l = len(longest_header.split(","))
- f_.write(fill_csv(csv_text=base_csv,
- width=_l))
- with open(file_name2, "w") as f_:
- f_.write(fill_csv(delta_csv,
- width=_l))
+ _f.write(fill_csv(csv_text=base_csv, width=_l))
+ with open(file_name2, "w", encoding="utf-8") as _f:
+ _f.write(fill_csv(delta_csv, width=_l))
# Now we can run the diff!
_r = run_cmd(cmd=('"csvdiff '
- f'{file_name1} {file_name2} '
+ f"{file_name1} {file_name2} "
'--format json"'))
if _r.get("code") == 0:
_r = json.loads(_r.get("output"))
@@ -86,6 +94,7 @@ def csv_diff(base_csv, delta_csv, tmp_dir="/tmp"):
def fill_csv(csv_text, width, value="x"):
+ """Fill a csv text with 'value' if it's length is less than width"""
data = []
for line in csv_text.strip().split("\n"):
if line.startswith("Strain") or line.startswith("#"):
@@ -95,6 +104,5 @@ def fill_csv(csv_text, width, value="x"):
for i, val in enumerate(_n):
if not val.strip():
_n[i] = value
- data.append(
- ",".join(_n + [value] * (width - len(_n))))
+ data.append(",".join(_n + [value] * (width - len(_n))))
return "\n".join(data)