init.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import itertools
  2. import os
  3. import pathlib
  4. import typing
  5. import warnings
  6. import numpy
  7. import pandas
  8. import pgpdump
  9. import pyperclip
  10. import scipy.io.wavfile
  11. import sympy
  12. import yaml
  13. from matplotlib import pyplot # pylint: disable=unused-import; frequently used in shell
  14. # https://pandas.pydata.org/pandas-docs/stable/user_guide/options.html
  15. pandas.options.display.max_rows = 200
  16. if os.environ.get("WAYLAND_DISPLAY"):
  17. # with default "gi" in python3-pyperclip=1.8.2-2 & python3-gi=3.42.2-3+b1
  18. # pyperclip.paste() always returned empty string
  19. pyperclip.set_clipboard("wl-clipboard")
  20. # https://docs.sympy.org/latest/modules/interactive.html#module-sympy.interactive.printing
  21. sympy.init_printing(pretty_print=True)
  22. def join_pgp_packets(
  23. packets: typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]],
  24. ) -> bytes:
  25. return b"".join(
  26. p.data if isinstance(p, pgpdump.packet.Packet) else p for p in packets
  27. )
  28. def numpy_array_from_file(
  29. path: typing.Union[str, pathlib.Path], dtype
  30. ) -> numpy.ndarray:
  31. if isinstance(path, str):
  32. path = pathlib.Path(path)
  33. return numpy.frombuffer(path.read_bytes(), dtype=dtype)
  34. def split_pgp_file(
  35. path: pathlib.Path,
  36. ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]:
  37. """
  38. https://datatracker.ietf.org/doc/html/rfc4880#section-4
  39. """
  40. bundle_bytes = path.read_bytes()
  41. if bundle_bytes.startswith(b"-----BEGIN"):
  42. bundle = pgpdump.AsciiData(bundle_bytes)
  43. else:
  44. bundle = pgpdump.BinaryData(bundle_bytes)
  45. remaining_bytes = bundle.data
  46. for packet in bundle.packets():
  47. try:
  48. prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1)
  49. except ValueError:
  50. assert len(packet.data) > 596 # actual threshold might be higher
  51. split_index = 2 ** 9
  52. prefix, remaining_bytes = remaining_bytes.split(
  53. packet.data[:split_index], maxsplit=1
  54. )
  55. separator, remaining_bytes = remaining_bytes.split(
  56. packet.data[split_index:], maxsplit=1
  57. )
  58. assert sum(separator) == len(packet.data) - split_index
  59. warnings.warn(
  60. "ignoring separator; output of join_pgp_packets will be invalid"
  61. )
  62. yield prefix
  63. yield packet
  64. assert not remaining_bytes
  65. def split_sequence_by_delimiter(
  66. sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1
  67. ) -> typing.Iterator[typing.Sequence]:
  68. slice_start_index, slice_length = 0, 0
  69. for is_delimiter, group in itertools.groupby(
  70. sequence, key=lambda item: item == delimiter
  71. ):
  72. group_length = sum(1 for _ in group)
  73. if is_delimiter and group_length >= delimiter_min_length:
  74. if slice_length > 0:
  75. yield sequence[slice_start_index : slice_start_index + slice_length]
  76. slice_start_index += slice_length + group_length
  77. slice_length = 0
  78. else:
  79. slice_length += group_length
  80. if slice_length > 0:
  81. yield sequence[slice_start_index : slice_start_index + slice_length]
  82. def trim_where(
  83. # https://docs.python.org/3.8/library/collections.abc.html#collections-abstract-base-classes
  84. sequence: typing.Sequence,
  85. condition: typing.Sequence[bool],
  86. ) -> typing.Sequence:
  87. start = 0
  88. for item_condition in condition:
  89. if item_condition:
  90. start += 1
  91. else:
  92. break
  93. stop = len(sequence)
  94. assert stop == len(condition)
  95. for item_condition in condition[::-1]:
  96. if item_condition:
  97. stop -= 1
  98. else:
  99. break
  100. return sequence[start:stop]
  101. def wavfile_read_mono(
  102. path: typing.Union[pathlib.Path, str]
  103. ) -> typing.Tuple[int, numpy.ndarray]:
  104. # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html
  105. rate, data = scipy.io.wavfile.read(path)
  106. if len(data.shape) == 1:
  107. return rate, data
  108. data_first_channel = data[:, 0]
  109. for channel_index in range(1, data.shape[1]):
  110. assert (data_first_channel == data[:, channel_index]).all()
  111. return rate, data_first_channel
  112. def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None:
  113. with pathlib.Path(path).open("w") as stream:
  114. yaml.safe_dump(data, stream)
  115. def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any:
  116. with pathlib.Path(path).open("r") as stream:
  117. return yaml.safe_load(stream)