123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import itertools
- import pathlib
- import typing
- import numpy
- import pandas
- import pgpdump
- import scipy.io.wavfile
- import sympy
- import yaml
- from matplotlib import pyplot
- pandas.options.display.max_rows = 200
- sympy.init_printing(pretty_print=True)
- def numpy_array_from_file(
- path: typing.Union[str, pathlib.Path], dtype
- ) -> numpy.ndarray:
- if isinstance(path, str):
- path = pathlib.Path(path)
- return numpy.frombuffer(path.read_bytes(), dtype=dtype)
- def split_pgp_file(
- path: pathlib.Path,
- ) -> typing.Iterator[typing.Union[bytearray, pgpdump.packet.Packet]]:
- bundle = pgpdump.BinaryData(path.read_bytes())
- remaining_bytes = bundle.data
- for packet in bundle.packets():
- prefix, remaining_bytes = remaining_bytes.split(packet.data, maxsplit=1)
- yield prefix
- yield packet
- assert not remaining_bytes
- def split_sequence_by_delimiter(
- sequence: typing.Sequence, delimiter: typing.Any, delimiter_min_length: int = 1
- ) -> typing.Iterator[typing.Sequence]:
- slice_start_index, slice_length = 0, 0
- for is_delimiter, group in itertools.groupby(
- sequence, key=lambda item: item == delimiter
- ):
- group_length = sum(1 for _ in group)
- if is_delimiter and group_length >= delimiter_min_length:
- if slice_length > 0:
- yield sequence[slice_start_index : slice_start_index + slice_length]
- slice_start_index += slice_length + group_length
- slice_length = 0
- else:
- slice_length += group_length
- if slice_length > 0:
- yield sequence[slice_start_index : slice_start_index + slice_length]
- def trim_where(
-
- sequence: typing.Sequence,
- condition: typing.Sequence[bool],
- ) -> typing.Sequence:
- start = 0
- for item_condition in condition:
- if item_condition:
- start += 1
- else:
- break
- stop = len(sequence)
- assert stop == len(condition)
- for item_condition in condition[::-1]:
- if item_condition:
- stop -= 1
- else:
- break
- return sequence[start:stop]
- def wavfile_read_mono(
- path: typing.Union[pathlib.Path, str]
- ) -> typing.Tuple[int, numpy.ndarray]:
-
- rate, data = scipy.io.wavfile.read(path)
- if len(data.shape) == 1:
- return rate, data
- data_first_channel = data[:, 0]
- for channel_index in range(1, data.shape[1]):
- assert (data_first_channel == data[:, channel_index]).all()
- return rate, data_first_channel
- def yaml_dump(path: typing.Union[pathlib.Path, str], data: typing.Any) -> None:
- with pathlib.Path(path).open("w") as stream:
- yaml.safe_dump(data, stream)
- def yaml_load(path: typing.Union[pathlib.Path, str]) -> typing.Any:
- with pathlib.Path(path).open("r") as stream:
- return yaml.safe_load(stream)
|