import datetime import functools import itertools import os import pathlib import typing import warnings import dateutil.parser import exifread import numpy import pandas import pgpdump import pyperclip import scipy.io.wavfile import sympy import yaml from matplotlib import pyplot # pylint: disable=unused-import; frequently used in shell # https://pandas.pydata.org/pandas-docs/stable/user_guide/options.html pandas.options.display.max_rows = 200 if os.environ.get("WAYLAND_DISPLAY"): # with default "gi" in python3-pyperclip=1.8.2-2 & python3-gi=3.42.2-3+b1 # pyperclip.paste() always returned empty string pyperclip.set_clipboard("wl-clipboard") # https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printing sympy.init_printing(pretty_print=True) def join_pgp_packets( packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]], ) -> bytes: return b"".join( p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets ) def numpy_array_from_file( path: typing.Union[str, pathlib.Path], dtype ) -> numpy.ndarray: if isinstance(path, str): path = pathlib.Path(path) return numpy.frombuffer(path.read_bytes(), dtype=dtype) def read_exif_datetime_original(path: str) -> typing.Optional[datetime.datetime]: with pathlib.Path(path).open("rb") as file: tags = exifread.process_file(file) if "EXIF DateTimeOriginal" not in tags: return None return dateutil.parser.parse( # https://web.archive.org/web/20240609164044/https://github.com/dateutil/dateutil/issues/271 datetime.datetime.strptime( tags["EXIF DateTimeOriginal"].values, "%Y:%m:%d %H:%M:%S" ).isoformat() + "." + tags["EXIF SubSecTimeOriginal"].values + ( tags["EXIF OffsetTimeOriginal"].values if "EXIF OffsetTimeOriginal" in tags else "" ) ) def split_pgp_file( path: pathlib.Path, ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]: """ https://datatracker.ietf.org/doc/html/rfc4880#section-4 """ bundle_bytes = path.read_bytes() if bundle_bytes.startswith(b"-----BEGIN"): bundle = pgpdump.AsciiData(bundle_bytes) else: bundle = pgpdump.BinaryData(bundle_bytes) remaining_bytes = bundle.data for packet in bundle.packets(): try: prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1) except ValueError: assert len(packet.data) > 596 # actual threshold might be higher split_index = 2**9 prefix, remaining_bytes = remaining_bytes.split( packet.data[:split_index], maxsplit=1 ) separator, remaining_bytes = remaining_bytes.split( packet.data[split_index:], maxsplit=1 ) assert sum(separator) == len(packet.data) - split_index warnings.warn( "ignoring separator; output of join_pgp_packets will be invalid" ) yield prefix yield packet assert not remaining_bytes def split_sequence_by_delimiter( sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1 ) -> typing.Iterator[typing.Sequence]: slice_start_index, slice_length = 0, 0 for is_delimiter, group in itertools.groupby( sequence, key=lambda item: item == delimiter ): group_length = sum(1 for _ in group) if is_delimiter and group_length >= delimiter_min_length: if slice_length > 0: yield sequence[slice_start_index : slice_start_index + slice_length] slice_start_index += slice_length + group_length slice_length = 0 else: slice_length += group_length if slice_length > 0: yield sequence[slice_start_index : slice_start_index + slice_length] def trim_where( # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes sequence: typing.Sequence, condition: typing.Sequence[bool], ) -> typing.Sequence: start = 0 for item_condition in condition: if item_condition: start += 1 else: break stop = len(sequence) assert stop == len(condition) for item_condition in condition[::-1]: if item_condition: stop -= 1 else: break return sequence[start:stop] def wavfile_read_mono( path: typing.Union[pathlib.Path, str] ) -> typing.Tuple[int, numpy.ndarray]: # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html rate, data = scipy.io.wavfile.read(path) if len(data.shape) == 1: return rate, data data_first_channel = data[:, 0] for channel_index in range(1, data.shape[1]): assert (data_first_channel == data[:, channel_index]).all() return rate, data_first_channel def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None: with pathlib.Path(path).open("w") as stream: yaml.safe_dump(data, stream) def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any: with pathlib.Path(path).open("r") as stream: return yaml.safe_load(stream) class Pipe: def __init__(self, function: typing.Callable[[typing.Any], typing.Any]) -> None: self._function = function def __ror__(self, other: typing.Iterable) -> typing.Any: return self._function(other) class PipeMap(Pipe): @classmethod def _partial_map( cls, function: typing.Callable[[typing.Any], typing.Any], *, axis: int ) -> typing.Callable[[typing.Any], typing.Any]: if axis <= 0: return functools.partial(map, function) return functools.partial(map, cls._partial_map(function, axis=axis - 1)) def __init__( self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0 ) -> None: self._function = self._partial_map(function, axis=axis) assert list(PipeMap._partial_map(str, axis=0)(range(3))) == ["0", "1", "2"] assert [tuple(r) for r in PipeMap._partial_map(str, axis=1)((range(2), range(3)))] == [ ("0", "1"), ("0", "1", "2"), ] assert range(65, 68) | PipeMap(chr) | PipeMap(str.lower) | Pipe(list) == ["a", "b", "c"] assert range(2, 4) | PipeMap(range) | PipeMap(lambda n: n**3, axis=1) | PipeMap( tuple ) | Pipe(list) == [(0, 1), (0, 1, 8)] assert "123\n456\n789".splitlines() | PipeMap(list) | PipeMap(int, axis=1) | PipeMap( tuple ) | Pipe(list) == [(1, 2, 3), (4, 5, 6), (7, 8, 9)] assert "123|456\n98|76|54".splitlines() | PipeMap(lambda s: s.split("|")) | PipeMap( list, axis=1 ) | PipeMap(int, axis=2) | PipeMap(tuple, axis=1) | PipeMap(tuple) | Pipe(list) == [ ((1, 2, 3), (4, 5, 6)), ((9, 8), (7, 6), (5, 4)), ] class PipePair(PipeMap): def __init__( self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0 ) -> None: super().__init__(function=lambda a: (a, function(a)), axis=axis) assert range(65, 68) | PipePair(chr) | Pipe(list) == [ (65, "A"), (66, "B"), (67, "C"), ] assert range(2, 4) | PipeMap(range) | PipePair(lambda n: n**3, axis=1) | PipeMap( set ) | Pipe(list) == [{(0, 0), (1, 1)}, {(0, 0), (1, 1), (2, 8)}]