import functools import itertools import os import pathlib import typing import warnings import numpy import pandas import pgpdump import pyperclip import scipy.io.wavfile import sympy import yaml from matplotlib import pyplot # pylint: disable=unused-import; frequently used in shell # https://pandas.pydata.org/pandas-docs/stable/user_guide/options.html pandas.options.display.max_rows = 200 if os.environ.get("WAYLAND_DISPLAY"): # with default "gi" in python3-pyperclip=1.8.2-2 & python3-gi=3.42.2-3+b1 # pyperclip.paste() always returned empty string pyperclip.set_clipboard("wl-clipboard") # https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printing sympy.init_printing(pretty_print=True) def join_pgp_packets( packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]], ) -> bytes: return b"".join( p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets ) def numpy_array_from_file( path: typing.Union[str, pathlib.Path], dtype ) -> numpy.ndarray: if isinstance(path, str): path = pathlib.Path(path) return numpy.frombuffer(path.read_bytes(), dtype=dtype) def split_pgp_file( path: pathlib.Path, ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]: """ https://datatracker.ietf.org/doc/html/rfc4880#section-4 """ bundle_bytes = path.read_bytes() if bundle_bytes.startswith(b"-----BEGIN"): bundle = pgpdump.AsciiData(bundle_bytes) else: bundle = pgpdump.BinaryData(bundle_bytes) remaining_bytes = bundle.data for packet in bundle.packets(): try: prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1) except ValueError: assert len(packet.data) > 596 # actual threshold might be higher split_index = 2**9 prefix, remaining_bytes = remaining_bytes.split( packet.data[:split_index], maxsplit=1 ) separator, remaining_bytes = remaining_bytes.split( packet.data[split_index:], maxsplit=1 ) assert sum(separator) == len(packet.data) - split_index warnings.warn( "ignoring separator; output of join_pgp_packets will be invalid" ) yield prefix yield packet assert not remaining_bytes def split_sequence_by_delimiter( sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1 ) -> typing.Iterator[typing.Sequence]: slice_start_index, slice_length = 0, 0 for is_delimiter, group in itertools.groupby( sequence, key=lambda item: item == delimiter ): group_length = sum(1 for _ in group) if is_delimiter and group_length >= delimiter_min_length: if slice_length > 0: yield sequence[slice_start_index : slice_start_index + slice_length] slice_start_index += slice_length + group_length slice_length = 0 else: slice_length += group_length if slice_length > 0: yield sequence[slice_start_index : slice_start_index + slice_length] def trim_where( # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes sequence: typing.Sequence, condition: typing.Sequence[bool], ) -> typing.Sequence: start = 0 for item_condition in condition: if item_condition: start += 1 else: break stop = len(sequence) assert stop == len(condition) for item_condition in condition[::-1]: if item_condition: stop -= 1 else: break return sequence[start:stop] def wavfile_read_mono( path: typing.Union[pathlib.Path, str] ) -> typing.Tuple[int, numpy.ndarray]: # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html rate, data = scipy.io.wavfile.read(path) if len(data.shape) == 1: return rate, data data_first_channel = data[:, 0] for channel_index in range(1, data.shape[1]): assert (data_first_channel == data[:, channel_index]).all() return rate, data_first_channel def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None: with pathlib.Path(path).open("w") as stream: yaml.safe_dump(data, stream) def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any: with pathlib.Path(path).open("r") as stream: return yaml.safe_load(stream) class Pipe: def __init__(self, function: typing.Callable[[typing.Any], typing.Any]) -> None: self._function = function def __ror__(self, other: typing.Iterable) -> typing.Any: return self._function(other) class PipeMap(Pipe): @classmethod def _partial_map( cls, function: typing.Callable[[typing.Any], typing.Any], *, axis: int ) -> typing.Callable[[typing.Any], typing.Any]: if axis <= 0: return functools.partial(map, function) return functools.partial(map, cls._partial_map(function, axis=axis - 1)) def __init__( self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0 ) -> None: self._function = self._partial_map(function, axis=axis) assert list(PipeMap._partial_map(str, axis=0)(range(3))) == ["0", "1", "2"] assert [tuple(r) for r in PipeMap._partial_map(str, axis=1)((range(2), range(3)))] == [ ("0", "1"), ("0", "1", "2"), ] assert range(65, 68) | PipeMap(chr) | PipeMap(str.lower) | Pipe(list) == ["a", "b", "c"] assert range(2, 4) | PipeMap(range) | PipeMap(lambda n: n**3, axis=1) | PipeMap( tuple ) | Pipe(list) == [(0, 1), (0, 1, 8)] assert "123\n456\n789".splitlines() | PipeMap(list) | PipeMap(int, axis=1) | PipeMap( tuple ) | Pipe(list) == [(1, 2, 3), (4, 5, 6), (7, 8, 9)] assert "123|456\n98|76|54".splitlines() | PipeMap(lambda s: s.split("|")) | PipeMap( list, axis=1 ) | PipeMap(int, axis=2) | PipeMap(tuple, axis=1) | PipeMap(tuple) | Pipe(list) == [ ((1, 2, 3), (4, 5, 6)), ((9, 8), (7, 6), (5, 4)), ]