init.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import datetime
  2. import functools
  3. import itertools
  4. import os
  5. import pathlib
  6. import typing
  7. import warnings
  8. import dateutil.parser
  9. import exifread
  10. import numpy
  11. import pandas
  12. import pgpdump
  13. import pyperclip
  14. import scipy.io.wavfile
  15. import sympy
  16. import yaml
  17. from matplotlib import pyplot # pylint: disable=unused-import; frequently used in shell
  18. # https://pandas.pydata.org/pandas-docs/stable/user_guide/options.html
  19. pandas.options.display.max_rows = 200
  20. if os.environ.get("WAYLAND_DISPLAY"):
  21. # with default "gi" in python3-pyperclip=1.8.2-2 & python3-gi=3.42.2-3+b1
  22. # pyperclip.paste() always returned empty string
  23. pyperclip.set_clipboard("wl-clipboard")
  24. # https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printing
  25. sympy.init_printing(pretty_print=True)
  26. def join_pgp_packets(
  27. packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]],
  28. ) -> bytes:
  29. return b"".join(
  30. p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets
  31. )
  32. def numpy_array_from_file(
  33. path: typing.Union[str, pathlib.Path], dtype
  34. ) -> numpy.ndarray:
  35. if isinstance(path, str):
  36. path = pathlib.Path(path)
  37. return numpy.frombuffer(path.read_bytes(), dtype=dtype)
  38. def read_exif_datetime_original(path: str) -> typing.Optional[datetime.datetime]:
  39. with pathlib.Path(path).open("rb") as file:
  40. tags = exifread.process_file(file)
  41. if "EXIF DateTimeOriginal" not in tags:
  42. return None
  43. return dateutil.parser.parse(
  44. # https://web.archive.org/web/20240609164044/https://github.com/dateutil/dateutil/issues/271
  45. datetime.datetime.strptime(
  46. tags["EXIF DateTimeOriginal"].values, "%Y:%m:%d %H:%M:%S"
  47. ).isoformat()
  48. + "."
  49. + tags["EXIF SubSecTimeOriginal"].values
  50. + (
  51. tags["EXIF OffsetTimeOriginal"].values
  52. if "EXIF OffsetTimeOriginal" in tags
  53. else ""
  54. )
  55. )
  56. def split_pgp_file(
  57. path: pathlib.Path,
  58. ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]:
  59. """
  60. https://datatracker.ietf.org/doc/html/rfc4880#section-4
  61. """
  62. bundle_bytes = path.read_bytes()
  63. if bundle_bytes.startswith(b"-----BEGIN"):
  64. bundle = pgpdump.AsciiData(bundle_bytes)
  65. else:
  66. bundle = pgpdump.BinaryData(bundle_bytes)
  67. remaining_bytes = bundle.data
  68. for packet in bundle.packets():
  69. try:
  70. prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1)
  71. except ValueError:
  72. assert len(packet.data) > 596 # actual threshold might be higher
  73. split_index = 2**9
  74. prefix, remaining_bytes = remaining_bytes.split(
  75. packet.data[:split_index], maxsplit=1
  76. )
  77. separator, remaining_bytes = remaining_bytes.split(
  78. packet.data[split_index:], maxsplit=1
  79. )
  80. assert sum(separator) == len(packet.data) - split_index
  81. warnings.warn(
  82. "ignoring separator; output of join_pgp_packets will be invalid"
  83. )
  84. yield prefix
  85. yield packet
  86. assert not remaining_bytes
  87. def split_sequence_by_delimiter(
  88. sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1
  89. ) -> typing.Iterator[typing.Sequence]:
  90. slice_start_index, slice_length = 0, 0
  91. for is_delimiter, group in itertools.groupby(
  92. sequence, key=lambda item: item == delimiter
  93. ):
  94. group_length = sum(1 for _ in group)
  95. if is_delimiter and group_length >= delimiter_min_length:
  96. if slice_length > 0:
  97. yield sequence[slice_start_index : slice_start_index + slice_length]
  98. slice_start_index += slice_length + group_length
  99. slice_length = 0
  100. else:
  101. slice_length += group_length
  102. if slice_length > 0:
  103. yield sequence[slice_start_index : slice_start_index + slice_length]
  104. def trim_where(
  105. # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes
  106. sequence: typing.Sequence,
  107. condition: typing.Sequence[bool],
  108. ) -> typing.Sequence:
  109. start = 0
  110. for item_condition in condition:
  111. if item_condition:
  112. start += 1
  113. else:
  114. break
  115. stop = len(sequence)
  116. assert stop == len(condition)
  117. for item_condition in condition[::-1]:
  118. if item_condition:
  119. stop -= 1
  120. else:
  121. break
  122. return sequence[start:stop]
  123. def wavfile_read_mono(
  124. path: typing.Union[pathlib.Path, str]
  125. ) -> typing.Tuple[int, numpy.ndarray]:
  126. # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html
  127. rate, data = scipy.io.wavfile.read(path)
  128. if len(data.shape) == 1:
  129. return rate, data
  130. data_first_channel = data[:, 0]
  131. for channel_index in range(1, data.shape[1]):
  132. assert (data_first_channel == data[:, channel_index]).all()
  133. return rate, data_first_channel
  134. def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None:
  135. with pathlib.Path(path).open("w") as stream:
  136. yaml.safe_dump(data, stream)
  137. def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any:
  138. with pathlib.Path(path).open("r") as stream:
  139. return yaml.safe_load(stream)
  140. class Pipe:
  141. def __init__(self, function: typing.Callable[[typing.Any], typing.Any]) -> None:
  142. self._function = function
  143. def __ror__(self, other: typing.Iterable) -> typing.Any:
  144. return self._function(other)
  145. class PipeMap(Pipe):
  146. @classmethod
  147. def _partial_map(
  148. cls, function: typing.Callable[[typing.Any], typing.Any], *, axis: int
  149. ) -> typing.Callable[[typing.Any], typing.Any]:
  150. if axis <= 0:
  151. return functools.partial(map, function)
  152. return functools.partial(map, cls._partial_map(function, axis=axis - 1))
  153. def __init__(
  154. self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0
  155. ) -> None:
  156. self._function = self._partial_map(function, axis=axis)
  157. assert list(PipeMap._partial_map(str, axis=0)(range(3))) == ["0", "1", "2"]
  158. assert [tuple(r) for r in PipeMap._partial_map(str, axis=1)((range(2), range(3)))] == [
  159. ("0", "1"),
  160. ("0", "1", "2"),
  161. ]
  162. assert range(65, 68) | PipeMap(chr) | PipeMap(str.lower) | Pipe(list) == ["a", "b", "c"]
  163. assert range(2, 4) | PipeMap(range) | PipeMap(lambda n: n**3, axis=1) | PipeMap(
  164. tuple
  165. ) | Pipe(list) == [(0, 1), (0, 1, 8)]
  166. assert "123\n456\n789".splitlines() | PipeMap(list) | PipeMap(int, axis=1) | PipeMap(
  167. tuple
  168. ) | Pipe(list) == [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
  169. assert "123|456\n98|76|54".splitlines() | PipeMap(lambda s: s.split("|")) | PipeMap(
  170. list, axis=1
  171. ) | PipeMap(int, axis=2) | PipeMap(tuple, axis=1) | PipeMap(tuple) | Pipe(list) == [
  172. ((1, 2, 3), (4, 5, 6)),
  173. ((9, 8), (7, 6), (5, 4)),
  174. ]
  175. class PipePair(PipeMap):
  176. def __init__(
  177. self, function: typing.Callable[[typing.Any], typing.Any], axis: int = 0
  178. ) -> None:
  179. super().__init__(function=lambda a: (a, function(a)), axis=axis)
  180. assert range(65, 68) | PipePair(chr) | Pipe(list) == [
  181. (65, "A"),
  182. (66, "B"),
  183. (67, "C"),
  184. ]
  185. assert range(2, 4) | PipeMap(range) | PipePair(lambda n: n**3, axis=1) | PipeMap(
  186. set
  187. ) | Pipe(list) == [{(0, 0), (1, 1)}, {(0, 0), (1, 1), (2, 8)}]