about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/lmdb_matrix.py29
1 files changed, 16 insertions, 13 deletions
diff --git a/scripts/lmdb_matrix.py b/scripts/lmdb_matrix.py
index efd7c58..2d73315 100644
--- a/scripts/lmdb_matrix.py
+++ b/scripts/lmdb_matrix.py
@@ -12,16 +12,15 @@ guix shell python-click python-lmdb python-wrapper python-numpy -- \
      <path-to-lmdb-store>
 
 """
+from dataclasses import dataclass
 from pathlib import Path
 from subprocess import check_output
 
-import lmdb
 import json
 import click
+import lmdb
 import numpy as np
 
-from dataclasses import dataclass
-
 
 @dataclass
 class GenotypeMatrix:
@@ -32,7 +31,8 @@ class GenotypeMatrix:
 
 
 def count_trailing_newlines(file_path):
-    with open(file_path, 'rb') as stream:
+    """Count trailing newlines in a file"""
+    with open(file_path, 'rb', encoding="utf-8") as stream:
         stream.seek(0, 2)  # Move to the end of the file
         file_size = stream.tell()
         if file_size == 0:
@@ -111,7 +111,7 @@ def get_genotype_metadata(genotype_file: str) -> tuple[dict, dict]:
     """
     metadata = {}
     file_metadata = {}
-    with open(genotype_file, "r") as stream:
+    with open(genotype_file, "r", encoding="utf-8") as stream:
         while True:
             line = stream.readline().strip()
             match line:
@@ -165,7 +165,7 @@ def get_genotype_dimensions(genotype_file: str) -> tuple[int, int]:
     counter = 0
     rows = []
 
-    with open(genotype_file, "r") as stream:
+    with open(genotype_file, "r", encoding="utf-8") as stream:
         while True:
             line = stream.readline()
             counter += 1
@@ -201,7 +201,7 @@ def read_genotype_headers(genotype_file: str) -> list[str]:
         ['Chr', 'Locus', 'cM', 'Mb', 'BXD1', 'BXD2', ..., 'BXD220']
     """
     rows = []
-    with open(genotype_file, "r") as stream:
+    with open(genotype_file, "r", encoding="utf-8") as stream:
         while True:
             line = stream.readline()
             match line:
@@ -213,6 +213,7 @@ def read_genotype_headers(genotype_file: str) -> list[str]:
     return rows
 
 
+# pylint: disable=too-many-locals
 def read_genotype_file(genotype_file: str) -> GenotypeMatrix:
     """Read a genotype file and construct a GenotypeMatrix object.
 
@@ -261,11 +262,9 @@ def read_genotype_file(genotype_file: str) -> GenotypeMatrix:
 
     counter = 0
     for i, el in enumerate(header):
-        if el in ["Chr", "Locus", "cM", "Mb"]:
-            continue
-        else:
-            counter = i
+        if el not in ["Chr", "Locus", "cM", "Mb"]:
             break
+        counter = i
 
     metadata_columns, individuals = header[:counter], header[counter:]
     nrows, ncols = get_genotype_dimensions(genotype_file)
@@ -286,10 +285,9 @@ def read_genotype_file(genotype_file: str) -> GenotypeMatrix:
     paternal = metadata.get("pat")
     heterozygous = metadata.get("het")
     unknown = metadata.get("unk")
-    locus, chromosomes = [], []
     i = 0
     sentinel = True
-    with open(genotype_file, "r") as stream:
+    with open(genotype_file, "r", encoding="utf-8") as stream:
         while True:
             if i == nrows:
                 break
@@ -375,6 +373,8 @@ def genotype_db_get(db: lmdb.Environment) -> GenotypeMatrix:
 
 
 def get_genotype_files(directory: str) -> list[tuple[str, int]]:
+    """Return a list of all the genotype files from a given
+    directory."""
     geno_files = [
         (_file.as_posix(), _file.stat().st_size)
         for _file in Path(directory).glob("*.geno") if _file.is_file()]
@@ -382,6 +382,8 @@ def get_genotype_files(directory: str) -> list[tuple[str, int]]:
 
 
 def __import_directory(directory: str, lmdb_path: str):
+    """Import all the genotype files from a given directory into
+    LMDB."""
     for genofile, file_size in get_genotype_files(directory):
         genofile = Path(genofile)
         size_mb = file_size / (1024 ** 2)
@@ -421,6 +423,7 @@ def print_current_matrix(database_directory: str):
         print(f"File Info: {current.file_info}")
 
 
+# pylint: disable=missing-function-docstring
 @click.group()
 def cli():
     pass