diff options
| author | Munyoki Kilyungi | 2025-07-14 16:41:50 +0300 |
|---|---|---|
| committer | BonfaceKilz | 2025-07-16 22:50:04 +0300 |
| commit | d628fa7d594c462b11107b243b416e395d9fcd1b (patch) | |
| tree | 3516e7a4b14c768cd35c4d061f54589d96356847 /scripts/lmdb_matrix.py | |
| parent | 31aa0f5075c5f29b1f4bc9261291878c6085e79f (diff) | |
| download | genenetwork3-d628fa7d594c462b11107b243b416e395d9fcd1b.tar.gz | |
Improve genotype file parsing.
* scripts/lmdb_matrix.py (GenotypeMatrix): Delete transpose. Add "file_info" field. (save_excursion, count_lines): Delete. (get_genotype_dimensions, read_genotype_headers): New function. (read_geno_file): Rename this to... (read_genotype_file): ... this. (genotype_db_put): Put "file_info". (genotype_db_get): Fetch "file_info". (import_genotype): Use read_genotype_file. (print_current_matrix): Remove transpose. Print out file_info metadata. Signed-off-by: Munyoki Kilyungi <me@bonfacemunyoki.com>
Diffstat (limited to 'scripts/lmdb_matrix.py')
| -rw-r--r-- | scripts/lmdb_matrix.py | 374 |
1 files changed, 274 insertions, 100 deletions
diff --git a/scripts/lmdb_matrix.py b/scripts/lmdb_matrix.py index 319c116..451976b 100644 --- a/scripts/lmdb_matrix.py +++ b/scripts/lmdb_matrix.py @@ -1,38 +1,277 @@ +import os import lmdb import json import click -from dataclasses import dataclass - -from contextlib import contextmanager import numpy as np +from dataclasses import dataclass + @dataclass class GenotypeMatrix: """Store the actual Genotype Matrix""" matrix: np.ndarray - transpose: np.ndarray metadata: dict + file_info: dict + + +def get_genotype_metadata(genotype_file: str) -> tuple[dict, dict]: + """Parse metadata from a genotype file, separating '@'-prefixed + and '#'-prefixed entries. + + This function reads a genotype file and extracts two types of metadata: + - '@'-prefixed metadata (e.g., '@name:BXD'), stored as key-value + pairs for dataset attributes. + - '#'-prefixed metadata (e.g., '# File name: BXD_Geno...'), stored + as key-value pairs for file information. Lines starting with + '#' without a colon are skipped as comments. Parsing stops at + the first non-metadata line. + + Args: + genotype_file (str): Path to the genotype file to be parsed. + + Returns: + tuple[dict, dict]: A tuple containing two dictionaries: + - First dict: '@'-prefixed metadata (e.g., {'name': 'BXD', + 'type': 'riset'}). + - Second dict: '#'-prefixed metadata with colons (e.g., + {'File name': 'BXD_Geno...', 'Citation': '...'}). + + Example: + >>> meta, file_info = get_genotype_metadata("BXD.small.geno") + >>> print(meta) + {'name': 'BXD', 'type': 'riset', 'mat': 'B', 'pat': 'D', + 'het': 'H', 'unk': 'U'} + >>> print(file_info) + {'File name': 'BXD_Geno-19Jan2017b_forGN.xls', 'Metadata': + 'Please retain...'} + + """ + metadata = {} + file_metadata = {} + with open(genotype_file, "r") as stream: + while True: + line = stream.readline().strip() + match line: + case meta if line.startswith("#"): + if ":" in meta: + key, value = meta[2:].split(":", 1) + file_metadata[key] = value + case meta if line.startswith("#"): + continue + case meta if line.startswith("@") and ":" in line: + key, value = meta[1:].split(":", 1) + if value: + metadata[key] = value.strip() + case _: + break + return metadata, file_metadata + +def get_genotype_dimensions(genotype_file: str) -> tuple[int, int]: + """Calculate the dimensions of the data section in a genotype + file. -@contextmanager -def save_excursion(stream): - """Context manager to restore stream position after use.""" - position = stream.tell() - try: - yield - finally: - stream.seek(position) + This function determines the number of data rows and columns in a + genotype file. It skips metadata lines (starting with '#' or '@') + and uses the first non-metadata line as the header to count + columns. The total number of lines is counted in binary mode to + efficiently handle large files, and the number of data rows is + calculated by subtracting metadata and header lines. Accounts for + a potential trailing newline. + + Args: + genotype_file (str): Path to the genotype file to be analyzed. + + Returns: + tuple[int, int]: A tuple containing: + - First int: Number of data rows (excluding metadata and + header). + - Second int: Number of columns (based on the header row). + + Example: + >>> rows, cols = get_genotype_dimensions("BXD.small.geno") + >>> print(rows, cols) + 2, 202 # Example: 2 data rows, 202 columns (from header) + + Note: + Assumes the first non-metadata line is the header row, split + by whitespace. A trailing newline may be included in the line + count but is accounted for in the returned row count. + + """ + counter = 0 + rows = [] + + with open(genotype_file, "r") as stream: + while True: + line = stream.readline() + match line: + case "": + counter += 1 + case _ if line.startswith("#") or line.startswith("@"): + counter += 1 + case _: + rows = line.split() + break + with open(genotype_file, "rb") as f: + lines = 0 + buf_size = 1024 * 1024 + read_f = f.raw.read + buf = read_f(buf_size).strip() + while buf: + lines += buf.count(b"\n") + buf = read_f(buf_size) + return (lines - counter, len(rows)) + + +def read_genotype_headers(genotype_file: str) -> list[str]: + """Extract the header row from a genotype file. + + This function reads a genotype file and returns the first + non-metadata line as a list of column headers. It skips lines + starting with '#' (comments), '@' (metadata), or empty lines, + assuming the first non-skipped line contains the header (e.g., + 'Chr', 'Locus', 'cM', 'Mb', followed by strain names like 'BXD1', + 'BXD2', etc.). The header is split by whitespace to create the + list of column names. + + Args: + genotype_file (str): Path to the genotype file to be parsed. + + Returns: + list[str]: A list of column headers from the first non-metadata line. + + Example: + >>> headers = read_genotype_headers("BXD.small.geno") + >>> print(headers) + ['Chr', 'Locus', 'cM', 'Mb', 'BXD1', 'BXD2', ..., 'BXD220'] + """ + rows = [] + with open(genotype_file, "r") as stream: + while True: + line = stream.readline() + match line: + case _ if line.startswith("#") or line.startswith("@") or line == "": + continue + case _: + rows = line.split() + break + return rows -def count_lines(stream) -> int: - """Count the number of lines in the stream from the current - position.""" - count = 0 - with save_excursion(stream): - while stream.readline().strip(): - count += 1 - return count +def read_genotype_file(genotype_file: str) -> GenotypeMatrix: + """Read a genotype file and construct a GenotypeMatrix object. + + This function parses a genotype file to extract metadata and + genotype data, creating a numerical matrix of genotype values and + associated metadata. It processes: + - '@'-prefixed metadata (e.g., '@mat:B') for dataset attributes + like maternal/paternal alleles. + - '#'-prefixed metadata (e.g., '# File name:...') for file + information. + - Header row for column names (e.g., 'Chr', 'Locus', BXD strains). + - Data rows, converting genotype symbols (e.g., 'B', 'D', 'H', + 'U') to numeric values (0, 1, 2, 3) based on metadata mappings. + + The function skips comment lines ('#'), metadata lines ('@'), and + empty lines, and organizes the data into a GenotypeMatrix with a + numpy array and metadata dictionaries. + + Args: + genotype_file (str): Path to the genotype file to be parsed. + + Returns: + GenotypeMatrix: An object containing: + - matrix: A numpy array (nrows x ncols) with genotype values (0: + maternal, 1: paternal, 2: heterozygous, 3: unknown). + - metadata: A dictionary with '@'-prefixed metadata, row/column + counts, individuals (BXD strains), metadata columns (e.g., 'Chr', + 'Locus'), and lists of metadata values per row. + - file_info: A dictionary with '#'-prefixed metadata (e.g., 'File + name', 'Citation'). + + Raises: + ValueError: If an unrecognized genotype symbol is encountered in + the data. + + Example: + >>> geno_matrix = read_genotype_file("BXD.small.geno") + >>> print(geno_matrix.matrix.shape) + (2, 198) # Example: 2 rows, 198 BXD strains + >>> print(geno_matrix.metadata["name"]) + 'BXD' + >>> print(geno_matrix.file_info["File name"]) + 'BXD_Geno-19Jan2017b_forGN.xls' + """ + header = read_genotype_headers(genotype_file) + + counter = 0 + for i, el in enumerate(header): + if el in ["Chr", "Locus", "cM", "Mb"]: + continue + else: + counter = i + break + + metadata_columns, individuals = header[:counter], header[counter:] + nrows, ncols = get_genotype_dimensions(genotype_file) + ncols -= len(metadata_columns) + matrix = np.zeros((nrows, ncols), dtype=np.uint8) + + metadata, file_metadata = get_genotype_metadata(genotype_file) + metadata = metadata | { + "nrows": nrows, + "ncols": ncols, + "individuals": individuals, + "metadata_columns": metadata_columns + } + for key in metadata_columns: + metadata[key] = [] + + maternal = metadata.get("mat") + paternal = metadata.get("pat") + heterozygous = metadata.get("het") + unknown = metadata.get("unk") + locus, chromosomes = [], [] + i = 0 + with open(genotype_file, "r") as stream: + while True: + if i == nrows: + break + line = stream.readline() + if line.startswith("#") or line.startswith("@") or line == "" \ + or line.startswith(metadata_columns[0]): + continue + line = line.strip().split() + + meta, data = line[:len(metadata_columns) + ], line[len(metadata_columns):] + for j, el in enumerate(data): + match el: + case _ if el.isdigit(): + matrix[i, j] = int(el) + case _ if maternal == el: + matrix[i, j] = 0 + case _ if paternal == el: + matrix[i, j] = 1 + case _ if heterozygous == el: + matrix[i, j] = 2 + case _ if unknown == el: + matrix[i, j] = 3 + case _: + raise ValueError + i += 1 + __map = dict(zip(metadata_columns, meta)) + for key in metadata_columns: + metadata[key].append(__map.get(key)) + + genotype_matrix = GenotypeMatrix( + matrix=matrix, + metadata=metadata, + file_info=file_metadata + ) + return genotype_matrix def create_database(db_path: str) -> lmdb.Environment: @@ -41,103 +280,38 @@ def create_database(db_path: str) -> lmdb.Environment: def genotype_db_put(db: lmdb.Environment, genotype: GenotypeMatrix) -> bool: + """Put genotype GENOTYPEMATRIX from DB environment""" metadata = json.dumps(genotype.metadata).encode("utf-8") + file_info = json.dumps(genotype.file_info).encode("utf-8") with db.begin(write=True) as txn: txn.put(b"matrix", genotype.matrix.tobytes()) - txn.put(b"transpose", genotype.transpose.tobytes()) txn.put(b"metadata", metadata) + # XXXX: KLUDGE: Put this in RDF instead + txn.put(b"file_info", file_info) return True -def read_geno_file(genotype_file: str) -> GenotypeMatrix: - """Read a geno file and return a GenotypeMatrix object.""" - with open(genotype_file, 'r') as stream: - # Read file metadata - file_metadata = {} - while True: - line = stream.readline().strip() - if not line: - break - if line.startswith('#'): - continue - if line.startswith('@'): - key, value = line[1:].split(':', 1) - file_metadata[key] = value - else: - stream.seek(stream.tell() - len(line) - 1) - break - - # Read header - header = stream.readline().strip().split() - metadata_columns = ["Chr", "Locus", "cM", - "Mb"] if "Mb" in header else ["Chr", "Locus", "cM"] - - individuals = header[len(metadata_columns):] - - # Read data - nrows = count_lines(stream) - ncols = len(individuals) - matrix = np.zeros((nrows, ncols), dtype=np.uint8) - maternal = file_metadata.get("mat") - paternal = file_metadata.get("pat") - heterozygous = file_metadata.get("het") - unknown = file_metadata.get("unk") - - metadata = { - "nrows": nrows, - "ncols": ncols, - "individuals": individuals, - "metadata_keys": metadata_columns + ["individuals"] - } - - for key in metadata_columns[2:]: - metadata[key] = [] - - locus, chromosomes = [], [] - for i in range(nrows): - line = stream.readline().strip().split() - meta, data = line[:len(metadata_columns) - ], line[len(metadata_columns):] - for j, element in enumerate(data): - if element.isdigit(): - matrix[i, j] = int(element) - elif element == maternal: - matrix[i, j] = 0 - elif element == paternal: - matrix[i, j] = 1 - elif element == heterozygous: - matrix[i, j] = 2 - elif element == unknown: - matrix[i, j] = 3 - data = dict(zip(metadata_columns, meta)) - locus.append(data.get("Locus")) - chromosomes.append(data.get("Chr")) - for col in metadata_columns[2:]: - metadata[col].append(float(data.get(col))) - metadata["locus"] = locus - metadata["chromosomes"] = chromosomes - genotype_matrix = GenotypeMatrix(matrix, matrix.T, metadata) - return genotype_matrix - - def genotype_db_get(db: lmdb.Environment) -> GenotypeMatrix: + """Get genotype GENOTYPEMATRIX from DB environment""" with db.begin() as txn: metadata = json.loads(txn.get(b"metadata").decode("utf-8")) nrows, ncols = metadata.get("nrows"), metadata.get("ncols") - matrix, transpose = txn.get(b"matrix"), txn.get(b"transpose") - # Double check this - matrix = np.frombuffer(matrix, dtype=np.uint8).reshape(nrows, ncols) - transpose = np.frombuffer(transpose, dtype=np.uint8)\ - .reshape(ncols, nrows) - return GenotypeMatrix(matrix, transpose, metadata) + matrix = np.frombuffer( + txn.get(b"matrix"), dtype=np.uint8).reshape(nrows, ncols) + return GenotypeMatrix( + matrix=matrix, + metadata=metadata, + file_info=json.loads(txn.get(b"file_info")) + ) @click.command(help="Import the genotype file") @click.argument("geno_file") @click.argument("genotype_database") def import_genotype(geno_file: str, genotype_database: str): + "Import a genotype file into genotype_database path" with create_database(genotype_database) as db: - genotype_db_put(db, read_geno_file(geno_file)) + genotype_db_put(db=db, genotype=read_genotype_file(geno_file)) @click.command(help="Print the current matrix") @@ -147,8 +321,8 @@ def print_current_matrix(database_directory: str): with create_database(database_directory) as db: current = genotype_db_get(db) print(f"Matrix: {current.matrix}") - print(f"Transpose: {current.transpose}") print(f"Metadata: {current.metadata}") + print(f"File Info: {current.file_info}") @click.group() |
