about summary refs log tree commit diff
path: root/scripts/lmdb_matrix.py
diff options
context:
space:
mode:
authorMunyoki Kilyungi2025-07-14 16:41:50 +0300
committerBonfaceKilz2025-07-16 22:50:04 +0300
commitd628fa7d594c462b11107b243b416e395d9fcd1b (patch)
tree3516e7a4b14c768cd35c4d061f54589d96356847 /scripts/lmdb_matrix.py
parent31aa0f5075c5f29b1f4bc9261291878c6085e79f (diff)
downloadgenenetwork3-d628fa7d594c462b11107b243b416e395d9fcd1b.tar.gz
Improve genotype file parsing.
* scripts/lmdb_matrix.py (GenotypeMatrix): Delete transpose.  Add
"file_info" field.
(save_excursion, count_lines): Delete.
(get_genotype_dimensions, read_genotype_headers): New function.
(read_geno_file): Rename this to...
(read_genotype_file): ... this.
(genotype_db_put): Put "file_info".
(genotype_db_get): Fetch "file_info".
(import_genotype): Use read_genotype_file.
(print_current_matrix): Remove transpose.  Print out file_info
metadata.

Signed-off-by: Munyoki Kilyungi <me@bonfacemunyoki.com>
Diffstat (limited to 'scripts/lmdb_matrix.py')
-rw-r--r--scripts/lmdb_matrix.py374
1 files changed, 274 insertions, 100 deletions
diff --git a/scripts/lmdb_matrix.py b/scripts/lmdb_matrix.py
index 319c116..451976b 100644
--- a/scripts/lmdb_matrix.py
+++ b/scripts/lmdb_matrix.py
@@ -1,38 +1,277 @@
+import os
 import lmdb
 import json
 import click
-from dataclasses import dataclass
-
-from contextlib import contextmanager
 import numpy as np
 
+from dataclasses import dataclass
+
 
 @dataclass
 class GenotypeMatrix:
     """Store the actual Genotype Matrix"""
     matrix: np.ndarray
-    transpose: np.ndarray
     metadata: dict
+    file_info: dict
+
+
+def get_genotype_metadata(genotype_file: str) -> tuple[dict, dict]:
+    """Parse metadata from a genotype file, separating '@'-prefixed
+    and '#'-prefixed entries.
+
+    This function reads a genotype file and extracts two types of metadata:
+    - '@'-prefixed metadata (e.g., '@name:BXD'), stored as key-value
+      pairs for dataset attributes.
+    - '#'-prefixed metadata (e.g., '# File name: BXD_Geno...'), stored
+      as key-value pairs for file information.  Lines starting with
+      '#' without a colon are skipped as comments. Parsing stops at
+      the first non-metadata line.
+
+    Args:
+        genotype_file (str): Path to the genotype file to be parsed.
+
+    Returns:
+        tuple[dict, dict]: A tuple containing two dictionaries:
+            - First dict: '@'-prefixed metadata (e.g., {'name': 'BXD',
+              'type': 'riset'}).
+            - Second dict: '#'-prefixed metadata with colons (e.g.,
+              {'File name': 'BXD_Geno...', 'Citation': '...'}).
+
+    Example:
+        >>> meta, file_info = get_genotype_metadata("BXD.small.geno")
+        >>> print(meta)
+        {'name': 'BXD', 'type': 'riset', 'mat': 'B', 'pat': 'D',
+        'het': 'H', 'unk': 'U'}
+        >>> print(file_info)
+        {'File name': 'BXD_Geno-19Jan2017b_forGN.xls', 'Metadata':
+        'Please retain...'}
+
+    """
+    metadata = {}
+    file_metadata = {}
+    with open(genotype_file, "r") as stream:
+        while True:
+            line = stream.readline().strip()
+            match line:
+                case meta if line.startswith("#"):
+                    if ":" in meta:
+                        key, value = meta[2:].split(":", 1)
+                        file_metadata[key] = value
+                case meta if line.startswith("#"):
+                    continue
+                case meta if line.startswith("@") and ":" in line:
+                    key, value = meta[1:].split(":", 1)
+                    if value:
+                        metadata[key] = value.strip()
+                case _:
+                    break
+    return metadata, file_metadata
+
 
+def get_genotype_dimensions(genotype_file: str) -> tuple[int, int]:
+    """Calculate the dimensions of the data section in a genotype
+    file.
 
-@contextmanager
-def save_excursion(stream):
-    """Context manager to restore stream position after use."""
-    position = stream.tell()
-    try:
-        yield
-    finally:
-        stream.seek(position)
+    This function determines the number of data rows and columns in a
+    genotype file.  It skips metadata lines (starting with '#' or '@')
+    and uses the first non-metadata line as the header to count
+    columns. The total number of lines is counted in binary mode to
+    efficiently handle large files, and the number of data rows is
+    calculated by subtracting metadata and header lines. Accounts for
+    a potential trailing newline.
+
+    Args:
+        genotype_file (str): Path to the genotype file to be analyzed.
+
+    Returns:
+        tuple[int, int]: A tuple containing:
+            - First int: Number of data rows (excluding metadata and
+              header).
+            - Second int: Number of columns (based on the header row).
+
+    Example:
+        >>> rows, cols = get_genotype_dimensions("BXD.small.geno")
+        >>> print(rows, cols)
+        2, 202  # Example: 2 data rows, 202 columns (from header)
+
+    Note:
+        Assumes the first non-metadata line is the header row, split
+        by whitespace.  A trailing newline may be included in the line
+        count but is accounted for in the returned row count.
+
+    """
+    counter = 0
+    rows = []
+
+    with open(genotype_file, "r") as stream:
+        while True:
+            line = stream.readline()
+            match line:
+                case "":
+                    counter += 1
+                case _ if line.startswith("#") or line.startswith("@"):
+                    counter += 1
+                case _:
+                    rows = line.split()
+                    break
+    with open(genotype_file, "rb") as f:
+        lines = 0
+        buf_size = 1024 * 1024
+        read_f = f.raw.read
+        buf = read_f(buf_size).strip()
+        while buf:
+            lines += buf.count(b"\n")
+            buf = read_f(buf_size)
+        return (lines - counter, len(rows))
+
+
+def read_genotype_headers(genotype_file: str) -> list[str]:
+    """Extract the header row from a genotype file.
+
+    This function reads a genotype file and returns the first
+    non-metadata line as a list of column headers. It skips lines
+    starting with '#' (comments), '@' (metadata), or empty lines,
+    assuming the first non-skipped line contains the header (e.g.,
+    'Chr', 'Locus', 'cM', 'Mb', followed by strain names like 'BXD1',
+    'BXD2', etc.). The header is split by whitespace to create the
+    list of column names.
+
+    Args:
+        genotype_file (str): Path to the genotype file to be parsed.
+
+    Returns:
+        list[str]: A list of column headers from the first non-metadata line.
+
+    Example:
+        >>> headers = read_genotype_headers("BXD.small.geno")
+        >>> print(headers)
+        ['Chr', 'Locus', 'cM', 'Mb', 'BXD1', 'BXD2', ..., 'BXD220']
+    """
+    rows = []
+    with open(genotype_file, "r") as stream:
+        while True:
+            line = stream.readline()
+            match line:
+                case _ if line.startswith("#") or line.startswith("@") or line == "":
+                    continue
+                case _:
+                    rows = line.split()
+                    break
+    return rows
 
 
-def count_lines(stream) -> int:
-    """Count the number of lines in the stream from the current
-    position."""
-    count = 0
-    with save_excursion(stream):
-        while stream.readline().strip():
-            count += 1
-    return count
+def read_genotype_file(genotype_file: str) -> GenotypeMatrix:
+    """Read a genotype file and construct a GenotypeMatrix object.
+
+    This function parses a genotype file to extract metadata and
+    genotype data, creating a numerical matrix of genotype values and
+    associated metadata. It processes:
+    - '@'-prefixed metadata (e.g., '@mat:B') for dataset attributes
+      like maternal/paternal alleles.
+    - '#'-prefixed metadata (e.g., '# File name:...') for file
+      information.
+    - Header row for column names (e.g., 'Chr', 'Locus', BXD strains).
+    - Data rows, converting genotype symbols (e.g., 'B', 'D', 'H',
+    'U') to numeric values (0, 1, 2, 3) based on metadata mappings.
+
+    The function skips comment lines ('#'), metadata lines ('@'), and
+    empty lines, and organizes the data into a GenotypeMatrix with a
+    numpy array and metadata dictionaries.
+
+ Args:
+    genotype_file (str): Path to the genotype file to be parsed.
+
+ Returns:
+    GenotypeMatrix: An object containing:
+    - matrix: A numpy array (nrows x ncols) with genotype values (0:
+    maternal, 1: paternal, 2: heterozygous, 3: unknown).
+    - metadata: A dictionary with '@'-prefixed metadata, row/column
+    counts, individuals (BXD strains), metadata columns (e.g., 'Chr',
+    'Locus'), and lists of metadata values per row.
+    - file_info: A dictionary with '#'-prefixed metadata (e.g., 'File
+      name', 'Citation').
+
+ Raises:
+    ValueError: If an unrecognized genotype symbol is encountered in
+    the data.
+
+ Example:
+    >>> geno_matrix = read_genotype_file("BXD.small.geno")
+    >>> print(geno_matrix.matrix.shape)
+    (2, 198) # Example: 2 rows, 198 BXD strains
+    >>> print(geno_matrix.metadata["name"])
+    'BXD'
+    >>> print(geno_matrix.file_info["File name"])
+    'BXD_Geno-19Jan2017b_forGN.xls'
+    """
+    header = read_genotype_headers(genotype_file)
+
+    counter = 0
+    for i, el in enumerate(header):
+        if el in ["Chr", "Locus", "cM", "Mb"]:
+            continue
+        else:
+            counter = i
+            break
+
+    metadata_columns, individuals = header[:counter], header[counter:]
+    nrows, ncols = get_genotype_dimensions(genotype_file)
+    ncols -= len(metadata_columns)
+    matrix = np.zeros((nrows, ncols), dtype=np.uint8)
+
+    metadata, file_metadata = get_genotype_metadata(genotype_file)
+    metadata = metadata | {
+        "nrows": nrows,
+        "ncols": ncols,
+        "individuals": individuals,
+        "metadata_columns": metadata_columns
+    }
+    for key in metadata_columns:
+        metadata[key] = []
+
+    maternal = metadata.get("mat")
+    paternal = metadata.get("pat")
+    heterozygous = metadata.get("het")
+    unknown = metadata.get("unk")
+    locus, chromosomes = [], []
+    i = 0
+    with open(genotype_file, "r") as stream:
+        while True:
+            if i == nrows:
+                break
+            line = stream.readline()
+            if line.startswith("#") or line.startswith("@") or line == "" \
+               or line.startswith(metadata_columns[0]):
+                continue
+            line = line.strip().split()
+
+            meta, data = line[:len(metadata_columns)
+                              ], line[len(metadata_columns):]
+            for j, el in enumerate(data):
+                match el:
+                    case _ if el.isdigit():
+                        matrix[i, j] = int(el)
+                    case _ if maternal == el:
+                        matrix[i, j] = 0
+                    case _ if paternal == el:
+                        matrix[i, j] = 1
+                    case _ if heterozygous == el:
+                        matrix[i, j] = 2
+                    case _ if unknown == el:
+                        matrix[i, j] = 3
+                    case _:
+                        raise ValueError
+            i += 1
+            __map = dict(zip(metadata_columns, meta))
+            for key in metadata_columns:
+                metadata[key].append(__map.get(key))
+
+    genotype_matrix = GenotypeMatrix(
+        matrix=matrix,
+        metadata=metadata,
+        file_info=file_metadata
+    )
+    return genotype_matrix
 
 
 def create_database(db_path: str) -> lmdb.Environment:
@@ -41,103 +280,38 @@ def create_database(db_path: str) -> lmdb.Environment:
 
 
 def genotype_db_put(db: lmdb.Environment, genotype: GenotypeMatrix) -> bool:
+    """Put genotype GENOTYPEMATRIX from DB environment"""
     metadata = json.dumps(genotype.metadata).encode("utf-8")
+    file_info = json.dumps(genotype.file_info).encode("utf-8")
     with db.begin(write=True) as txn:
         txn.put(b"matrix", genotype.matrix.tobytes())
-        txn.put(b"transpose", genotype.transpose.tobytes())
         txn.put(b"metadata", metadata)
+        # XXXX: KLUDGE: Put this in RDF instead
+        txn.put(b"file_info", file_info)
     return True
 
 
-def read_geno_file(genotype_file: str) -> GenotypeMatrix:
-    """Read a geno file and return a GenotypeMatrix object."""
-    with open(genotype_file, 'r') as stream:
-        # Read file metadata
-        file_metadata = {}
-        while True:
-            line = stream.readline().strip()
-            if not line:
-                break
-            if line.startswith('#'):
-                continue
-            if line.startswith('@'):
-                key, value = line[1:].split(':', 1)
-                file_metadata[key] = value
-            else:
-                stream.seek(stream.tell() - len(line) - 1)
-                break
-
-        # Read header
-        header = stream.readline().strip().split()
-        metadata_columns = ["Chr", "Locus", "cM",
-                            "Mb"] if "Mb" in header else ["Chr", "Locus", "cM"]
-
-        individuals = header[len(metadata_columns):]
-
-        # Read data
-        nrows = count_lines(stream)
-        ncols = len(individuals)
-        matrix = np.zeros((nrows, ncols), dtype=np.uint8)
-        maternal = file_metadata.get("mat")
-        paternal = file_metadata.get("pat")
-        heterozygous = file_metadata.get("het")
-        unknown = file_metadata.get("unk")
-
-        metadata = {
-            "nrows": nrows,
-            "ncols": ncols,
-            "individuals": individuals,
-            "metadata_keys": metadata_columns + ["individuals"]
-        }
-
-        for key in metadata_columns[2:]:
-            metadata[key] = []
-
-        locus, chromosomes = [], []
-        for i in range(nrows):
-            line = stream.readline().strip().split()
-            meta, data = line[:len(metadata_columns)
-                              ], line[len(metadata_columns):]
-            for j, element in enumerate(data):
-                if element.isdigit():
-                    matrix[i, j] = int(element)
-                elif element == maternal:
-                    matrix[i, j] = 0
-                elif element == paternal:
-                    matrix[i, j] = 1
-                elif element == heterozygous:
-                    matrix[i, j] = 2
-                elif element == unknown:
-                    matrix[i, j] = 3
-            data = dict(zip(metadata_columns, meta))
-            locus.append(data.get("Locus"))
-            chromosomes.append(data.get("Chr"))
-            for col in metadata_columns[2:]:
-                metadata[col].append(float(data.get(col)))
-        metadata["locus"] = locus
-        metadata["chromosomes"] = chromosomes
-        genotype_matrix = GenotypeMatrix(matrix, matrix.T, metadata)
-        return genotype_matrix
-
-
 def genotype_db_get(db: lmdb.Environment) -> GenotypeMatrix:
+    """Get genotype GENOTYPEMATRIX from DB environment"""
     with db.begin() as txn:
         metadata = json.loads(txn.get(b"metadata").decode("utf-8"))
         nrows, ncols = metadata.get("nrows"), metadata.get("ncols")
-        matrix, transpose = txn.get(b"matrix"), txn.get(b"transpose")
-        # Double check this
-        matrix = np.frombuffer(matrix, dtype=np.uint8).reshape(nrows, ncols)
-        transpose = np.frombuffer(transpose, dtype=np.uint8)\
-                      .reshape(ncols, nrows)
-        return GenotypeMatrix(matrix, transpose, metadata)
+        matrix = np.frombuffer(
+            txn.get(b"matrix"), dtype=np.uint8).reshape(nrows, ncols)
+        return GenotypeMatrix(
+            matrix=matrix,
+            metadata=metadata,
+            file_info=json.loads(txn.get(b"file_info"))
+        )
 
 
 @click.command(help="Import the genotype file")
 @click.argument("geno_file")
 @click.argument("genotype_database")
 def import_genotype(geno_file: str, genotype_database: str):
+    "Import a genotype file into genotype_database path"
     with create_database(genotype_database) as db:
-        genotype_db_put(db, read_geno_file(geno_file))
+        genotype_db_put(db=db, genotype=read_genotype_file(geno_file))
 
 
 @click.command(help="Print the current matrix")
@@ -147,8 +321,8 @@ def print_current_matrix(database_directory: str):
     with create_database(database_directory) as db:
         current = genotype_db_get(db)
         print(f"Matrix: {current.matrix}")
-        print(f"Transpose: {current.transpose}")
         print(f"Metadata: {current.metadata}")
+        print(f"File Info: {current.file_info}")
 
 
 @click.group()