| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 | import datetimeimport functoolsimport itertoolsimport osimport pathlibimport typingimport warningsimport dateutil.parserimport exifreadimport numpyimport pandasimport pgpdumpimport pyperclipimport scipy.io.wavfileimport sympyimport yamlfrom matplotlib import pyplot  # pylint: disable=unused-import; frequently used in shell# https://pandas.pydata.org/pandas-docs/stable/user_guide/options.htmlpandas.options.display.max_rows = 200if os.environ.get("WAYLAND_DISPLAY"):    # with default "gi" in python3-pyperclip=1.8.2-2 & python3-gi=3.42.2-3+b1    # pyperclip.paste() always returned empty string    pyperclip.set_clipboard("wl-clipboard")# https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printingsympy.init_printing(pretty_print=True)def join_pgp_packets(    packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]],) -> bytes:    return b"".join(        p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets    )def numpy_array_from_file(    path: typing.Union[str, pathlib.Path], dtype) -> numpy.ndarray:    if isinstance(path, str):        path = pathlib.Path(path)    return numpy.frombuffer(path.read_bytes(), dtype=dtype)def read_exif_datetime_original(path: str) -> typing.Optional[datetime.datetime]:    with pathlib.Path(path).open("rb") as file:        tags = exifread.process_file(file)        if "EXIF DateTimeOriginal" not in tags:            return None        return dateutil.parser.parse(            # https://web.archive.org/web/20240609164044/https://github.com/dateutil/dateutil/issues/271            datetime.datetime.strptime(                tags["EXIF DateTimeOriginal"].values, "%Y:%m:%d %H:%M:%S"            ).isoformat()            + "."            + tags["EXIF SubSecTimeOriginal"].values            + (                tags["EXIF OffsetTimeOriginal"].values                if "EXIF OffsetTimeOriginal" in tags                else ""            )        )def split_pgp_file(    path: pathlib.Path,) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]:    """    https://datatracker.ietf.org/doc/html/rfc4880#section-4    """    bundle_bytes = path.read_bytes()    if bundle_bytes.startswith(b"-----BEGIN"):        bundle = pgpdump.AsciiData(bundle_bytes)    else:        bundle = pgpdump.BinaryData(bundle_bytes)    remaining_bytes = bundle.data    for packet in bundle.packets():        try:            prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1)        except ValueError:            assert len(packet.data) > 596  # actual threshold might be higher            split_index = 2**9            prefix, remaining_bytes = remaining_bytes.split(                packet.data[:split_index], maxsplit=1            )            separator, remaining_bytes = remaining_bytes.split(                packet.data[split_index:], maxsplit=1            )            assert sum(separator) == len(packet.data) - split_index            warnings.warn(                "ignoring separator; output of join_pgp_packets will be invalid"            )        yield prefix        yield packet    assert not remaining_bytesdef split_sequence_by_delimiter(    sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1) -> typing.Iterator[typing.Sequence]:    slice_start_index, slice_length = 0, 0    for is_delimiter, group in itertools.groupby(        sequence, key=lambda item: item == delimiter    ):        group_length = sum(1 for _ in group)        if is_delimiter and group_length >= delimiter_min_length:            if slice_length > 0:                yield sequence[slice_start_index : slice_start_index + slice_length]            slice_start_index += slice_length + group_length            slice_length = 0        else:            slice_length += group_length    if slice_length > 0:        yield sequence[slice_start_index : slice_start_index + slice_length]def trim_where(    # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes    sequence: typing.Sequence,    condition: typing.Sequence[bool],) -> typing.Sequence:    start = 0    for item_condition in condition:        if item_condition:            start += 1        else:            break    stop = len(sequence)    assert stop == len(condition)    for item_condition in condition[::-1]:        if item_condition:            stop -= 1        else:            break    return sequence[start:stop]def wavfile_read_mono(    path: typing.Union[pathlib.Path, str]) -> typing.Tuple[int, numpy.ndarray]:    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html    rate, data = scipy.io.wavfile.read(path)    if len(data.shape) == 1:        return rate, data    data_first_channel = data[:, 0]    for channel_index in range(1, data.shape[1]):        assert (data_first_channel == data[:, channel_index]).all()    return rate, data_first_channeldef yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None:    with pathlib.Path(path).open("w") as stream:        yaml.safe_dump(data, stream)def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any:    with pathlib.Path(path).open("r") as stream:        return yaml.safe_load(stream)class Pipe:    def __init__(self, function: typing.Callable[[typing.Any], typing.Any]) -> None:        self._function = function    def __ror__(self, other: typing.Iterable) -> typing.Any:        return self._function(other)class PipeMap(Pipe):    @classmethod    def _partial_map(        cls, function: typing.Callable[[typing.Any], typing.Any], *, axis: int    ) -> typing.Callable[[typing.Any], typing.Any]:        if axis <= 0:            return functools.partial(map, function)        return functools.partial(map, cls._partial_map(function, axis=axis - 1))    def __init__(        self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0    ) -> None:        self._function = self._partial_map(function, axis=axis)assert list(PipeMap._partial_map(str, axis=0)(range(3))) == ["0", "1", "2"]assert [tuple(r) for r in PipeMap._partial_map(str, axis=1)((range(2), range(3)))] == [    ("0", "1"),    ("0", "1", "2"),]assert range(65, 68) | PipeMap(chr) | PipeMap(str.lower) | Pipe(list) == ["a", "b", "c"]assert range(2, 4) | PipeMap(range) | PipeMap(lambda n: n**3, axis=1) | PipeMap(    tuple) | Pipe(list) == [(0, 1), (0, 1, 8)]assert "123\n456\n789".splitlines() | PipeMap(list) | PipeMap(int, axis=1) | PipeMap(    tuple) | Pipe(list) == [(1, 2, 3), (4, 5, 6), (7, 8, 9)]assert "123|456\n98|76|54".splitlines() | PipeMap(lambda s: s.split("|")) | PipeMap(    list, axis=1) | PipeMap(int, axis=2) | PipeMap(tuple, axis=1) | PipeMap(tuple) | Pipe(list) == [    ((1, 2, 3), (4, 5, 6)),    ((9, 8), (7, 6), (5, 4)),]class PipePair(PipeMap):    def __init__(        self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0    ) -> None:        super().__init__(function=lambda a: (a, function(a)), axis=axis)assert range(65, 68) | PipePair(chr) | Pipe(list) == [    (65, "A"),    (66, "B"),    (67, "C"),]assert range(2, 4) | PipeMap(range) | PipePair(lambda n: n**3, axis=1) | PipeMap(    set) | Pipe(list) == [{(0, 0), (1, 1)}, {(0, 0), (1, 1), (2, 8)}]
 |