import itertools import pathlib import typing import warnings import numpy import pandas import pgpdump import scipy.io.wavfile import sympy import yaml from matplotlib import pyplot # pylint: disable=unused-import; frequently used in shell # https://pandas.pydata.org/pandas-docs/stable/user_guide/options.html pandas.options.display.max_rows = 200 # https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printing sympy.init_printing(pretty_print=True) def join_pgp_packets( packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]], ) -> bytes: return b"".join( p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets ) def numpy_array_from_file( path: typing.Union[str, pathlib.Path], dtype ) -> numpy.ndarray: if isinstance(path, str): path = pathlib.Path(path) return numpy.frombuffer(path.read_bytes(), dtype=dtype) def split_pgp_file( path: pathlib.Path, ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]: """ https://datatracker.ietf.org/doc/html/rfc4880#section-4 """ bundle_bytes = path.read_bytes() if bundle_bytes.startswith(b"-----BEGIN"): bundle = pgpdump.AsciiData(bundle_bytes) else: bundle = pgpdump.BinaryData(bundle_bytes) remaining_bytes = bundle.data for packet in bundle.packets(): try: prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1) except ValueError: assert len(packet.data) > 596 # actual threshold might be higher split_index = 2 ** 9 prefix, remaining_bytes = remaining_bytes.split( packet.data[:split_index], maxsplit=1 ) separator, remaining_bytes = remaining_bytes.split( packet.data[split_index:], maxsplit=1 ) assert sum(separator) == len(packet.data) - split_index warnings.warn( "ignoring separator; output of join_pgp_packets will be invalid" ) yield prefix yield packet assert not remaining_bytes def split_sequence_by_delimiter( sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1 ) -> typing.Iterator[typing.Sequence]: slice_start_index, slice_length = 0, 0 for is_delimiter, group in itertools.groupby( sequence, key=lambda item: item == delimiter ): group_length = sum(1 for _ in group) if is_delimiter and group_length >= delimiter_min_length: if slice_length > 0: yield sequence[slice_start_index : slice_start_index + slice_length] slice_start_index += slice_length + group_length slice_length = 0 else: slice_length += group_length if slice_length > 0: yield sequence[slice_start_index : slice_start_index + slice_length] def trim_where( # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes sequence: typing.Sequence, condition: typing.Sequence[bool], ) -> typing.Sequence: start = 0 for item_condition in condition: if item_condition: start += 1 else: break stop = len(sequence) assert stop == len(condition) for item_condition in condition[::-1]: if item_condition: stop -= 1 else: break return sequence[start:stop] def wavfile_read_mono( path: typing.Union[pathlib.Path, str] ) -> typing.Tuple[int, numpy.ndarray]: # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html rate, data = scipy.io.wavfile.read(path) if len(data.shape) == 1: return rate, data data_first_channel = data[:, 0] for channel_index in range(1, data.shape[1]): assert (data_first_channel == data[:, channel_index]).all() return rate, data_first_channel def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None: with pathlib.Path(path).open("w") as stream: yaml.safe_dump(data, stream) def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any: with pathlib.Path(path).open("r") as stream: return yaml.safe_load(stream)