"""Validate the headers""" from functools import reduce from typing import Union, Tuple, Sequence from quality_control.errors import InvalidValue, DuplicateHeading from quality_control.errors import DuplicateHeader, InvalidHeaderValue def valid_header(strains, headers): "Return the valid headers with reference to strains or throw an error" if not bool(headers[1:]): raise InvalidHeaderValue( "The header MUST contain at least 2 columns") invalid_headers = tuple( header for header in headers[1:] if header not in strains) if invalid_headers: raise InvalidHeaderValue( *(f"'{header}' not a valid strain." for header in invalid_headers)) unique_headers = set(headers) if len(unique_headers) != len(headers): repeated = ( (header, headers.count(header)) for header in unique_headers if headers.count(header) > 1) raise DuplicateHeader(*( f"'{header}' is present in the header row {times} times." for header, times in repeated)) return headers def invalid_header( line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: if len(headers) < 2: return InvalidValue( line_number, 0, "".join(headers), "The header MUST contain at least 2 columns") def invalid_headings( line_number: int, strains: Sequence[str], headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]: return tuple( InvalidValue( line_number, col, header, f"'{header}' not a valid strain.") for col, header in enumerate(headings, start=2) if header not in strains) def duplicate_headings( line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: def __update_columns__(acc, item): if item[1] in acc.keys(): return {**acc, item[1]: acc[item[1]] + (item[0],)} return {**acc, item[1]: (item[0],)} repeated = { heading: columns for heading, columns in reduce(__update_columns__, enumerate(headers, start=1), dict()).items() if len(columns) > 1 } return tuple( DuplicateHeading( line_number, heading, columns, ( f"Heading '{heading}', is repeated in columns " f"{','.join(str(i) for i in columns)}")) for heading, columns in repeated.items())