serial_source_block.py 1023 B

123456789101112131415161718192021222324252627
  1. import gnuradio
  2. import numpy
  3. import serial
  4. class SerialSourceBlock(gnuradio.gr.sync_block):
  5. def __init__(
  6. self, port="/dev/ttyUSB0", baud_rate=115200, buffer_max_length=10, timeout=2
  7. ):
  8. gnuradio.gr.sync_block.__init__(
  9. self, name="Serial Source Block", in_sig=None, out_sig=[numpy.int16]
  10. )
  11. self._serial_port = serial.Serial(
  12. port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout
  13. )
  14. self._buffer_max_length = buffer_max_length
  15. def work(self, input_items, output_items):
  16. # pylint: disable=unused-argument
  17. buffer = self._serial_port.read(
  18. 2 * min(len(output_items[0]), self._buffer_max_length)
  19. )
  20. assert len(buffer) % 2 == 0
  21. buffer_samples_count = len(buffer) // 2
  22. output_items[0][:buffer_samples_count] = numpy.frombuffer(buffer, dtype=">u2")
  23. assert output_items[0][:buffer_samples_count].max() < 1024, "lost sync?"
  24. return buffer_samples_count