123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import datetime
- import functools
- import itertools
- import os
- import pathlib
- import typing
- import warnings
- import dateutil.parser
- import exifread
- import numpy
- import pandas
- import pgpdump
- import pyperclip
- import scipy.io.wavfile
- import sympy
- import yaml
- from matplotlib import pyplot # pylint: disable=unused-import; frequently used in shell
- # https://pandas.pydata.org/pandas-docs/stable/user_guide/options.html
- pandas.options.display.max_rows = 200
- if os.environ.get("WAYLAND_DISPLAY"):
- # with default "gi" in python3-pyperclip=1.8.2-2 & python3-gi=3.42.2-3+b1
- # pyperclip.paste() always returned empty string
- pyperclip.set_clipboard("wl-clipboard")
- # https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printing
- sympy.init_printing(pretty_print=True)
- def join_pgp_packets(
- packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]],
- ) -> bytes:
- return b"".join(
- p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets
- )
- def numpy_array_from_file(
- path: typing.Union[str, pathlib.Path], dtype
- ) -> numpy.ndarray:
- if isinstance(path, str):
- path = pathlib.Path(path)
- return numpy.frombuffer(path.read_bytes(), dtype=dtype)
- def read_exif_datetime_original(path: str) -> typing.Optional[datetime.datetime]:
- with pathlib.Path(path).open("rb") as file:
- tags = exifread.process_file(file)
- if "EXIF DateTimeOriginal" not in tags:
- return None
- return dateutil.parser.parse(
- # https://web.archive.org/web/20240609164044/https://github.com/dateutil/dateutil/issues/271
- datetime.datetime.strptime(
- tags["EXIF DateTimeOriginal"].values, "%Y:%m:%d %H:%M:%S"
- ).isoformat()
- + "."
- + tags["EXIF SubSecTimeOriginal"].values
- + (
- tags["EXIF OffsetTimeOriginal"].values
- if "EXIF OffsetTimeOriginal" in tags
- else ""
- )
- )
- def split_pgp_file(
- path: pathlib.Path,
- ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]:
- """
- https://datatracker.ietf.org/doc/html/rfc4880#section-4
- """
- bundle_bytes = path.read_bytes()
- if bundle_bytes.startswith(b"-----BEGIN"):
- bundle = pgpdump.AsciiData(bundle_bytes)
- else:
- bundle = pgpdump.BinaryData(bundle_bytes)
- remaining_bytes = bundle.data
- for packet in bundle.packets():
- try:
- prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1)
- except ValueError:
- assert len(packet.data) > 596 # actual threshold might be higher
- split_index = 2**9
- prefix, remaining_bytes = remaining_bytes.split(
- packet.data[:split_index], maxsplit=1
- )
- separator, remaining_bytes = remaining_bytes.split(
- packet.data[split_index:], maxsplit=1
- )
- assert sum(separator) == len(packet.data) - split_index
- warnings.warn(
- "ignoring separator; output of join_pgp_packets will be invalid"
- )
- yield prefix
- yield packet
- assert not remaining_bytes
- def split_sequence_by_delimiter(
- sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1
- ) -> typing.Iterator[typing.Sequence]:
- slice_start_index, slice_length = 0, 0
- for is_delimiter, group in itertools.groupby(
- sequence, key=lambda item: item == delimiter
- ):
- group_length = sum(1 for _ in group)
- if is_delimiter and group_length >= delimiter_min_length:
- if slice_length > 0:
- yield sequence[slice_start_index : slice_start_index + slice_length]
- slice_start_index += slice_length + group_length
- slice_length = 0
- else:
- slice_length += group_length
- if slice_length > 0:
- yield sequence[slice_start_index : slice_start_index + slice_length]
- def trim_where(
- # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes
- sequence: typing.Sequence,
- condition: typing.Sequence[bool],
- ) -> typing.Sequence:
- start = 0
- for item_condition in condition:
- if item_condition:
- start += 1
- else:
- break
- stop = len(sequence)
- assert stop == len(condition)
- for item_condition in condition[::-1]:
- if item_condition:
- stop -= 1
- else:
- break
- return sequence[start:stop]
- def wavfile_read_mono(
- path: typing.Union[pathlib.Path, str]
- ) -> typing.Tuple[int, numpy.ndarray]:
- # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html
- rate, data = scipy.io.wavfile.read(path)
- if len(data.shape) == 1:
- return rate, data
- data_first_channel = data[:, 0]
- for channel_index in range(1, data.shape[1]):
- assert (data_first_channel == data[:, channel_index]).all()
- return rate, data_first_channel
- def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None:
- with pathlib.Path(path).open("w") as stream:
- yaml.safe_dump(data, stream)
- def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any:
- with pathlib.Path(path).open("r") as stream:
- return yaml.safe_load(stream)
- class Pipe:
- def __init__(self, function: typing.Callable[[typing.Any], typing.Any]) -> None:
- self._function = function
- def __ror__(self, other: typing.Iterable) -> typing.Any:
- return self._function(other)
- class PipeMap(Pipe):
- @classmethod
- def _partial_map(
- cls, function: typing.Callable[[typing.Any], typing.Any], *, axis: int
- ) -> typing.Callable[[typing.Any], typing.Any]:
- if axis <= 0:
- return functools.partial(map, function)
- return functools.partial(map, cls._partial_map(function, axis=axis - 1))
- def __init__(
- self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0
- ) -> None:
- self._function = self._partial_map(function, axis=axis)
- assert list(PipeMap._partial_map(str, axis=0)(range(3))) == ["0", "1", "2"]
- assert [tuple(r) for r in PipeMap._partial_map(str, axis=1)((range(2), range(3)))] == [
- ("0", "1"),
- ("0", "1", "2"),
- ]
- assert range(65, 68) | PipeMap(chr) | PipeMap(str.lower) | Pipe(list) == ["a", "b", "c"]
- assert range(2, 4) | PipeMap(range) | PipeMap(lambda n: n**3, axis=1) | PipeMap(
- tuple
- ) | Pipe(list) == [(0, 1), (0, 1, 8)]
- assert "123\n456\n789".splitlines() | PipeMap(list) | PipeMap(int, axis=1) | PipeMap(
- tuple
- ) | Pipe(list) == [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
- assert "123|456\n98|76|54".splitlines() | PipeMap(lambda s: s.split("|")) | PipeMap(
- list, axis=1
- ) | PipeMap(int, axis=2) | PipeMap(tuple, axis=1) | PipeMap(tuple) | Pipe(list) == [
- ((1, 2, 3), (4, 5, 6)),
- ((9, 8), (7, 6), (5, 4)),
- ]
- class PipePair(PipeMap):
- def __init__(
- self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0
- ) -> None:
- super().__init__(function=lambda a: (a, function(a)), axis=axis)
- assert range(65, 68) | PipePair(chr) | Pipe(list) == [
- (65, "A"),
- (66, "B"),
- (67, "C"),
- ]
- assert range(2, 4) | PipeMap(range) | PipePair(lambda n: n**3, axis=1) | PipeMap(
- set
- ) | Pipe(list) == [{(0, 0), (1, 1)}, {(0, 0), (1, 1), (2, 8)}]
|