serial_source_block.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334
  1. import gnuradio
  2. import numpy
  3. import serial
  4. class SerialSourceBlock(gnuradio.gr.sync_block):
  5. def __init__(
  6. self,
  7. port="/dev/ttyUSB0",
  8. baud_rate=115200,
  9. buffer_max_length=10,
  10. timeout=2,
  11. invert=False,
  12. ):
  13. # pylint: disable=too-many-arguments
  14. gnuradio.gr.sync_block.__init__(
  15. self, name="Serial Source Block", in_sig=None, out_sig=[numpy.uint8]
  16. )
  17. self._serial_port = serial.Serial(
  18. port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout
  19. )
  20. self._buffer_max_length = buffer_max_length
  21. self._invert = invert
  22. def work(self, input_items, output_items):
  23. # pylint: disable=unused-argument
  24. buffer = self._serial_port.read(
  25. min(len(output_items[0]), self._buffer_max_length)
  26. )
  27. output_items[0][: len(buffer)] = numpy.frombuffer(buffer, dtype="u1")
  28. if self._invert:
  29. output_items[0] ^= 1
  30. assert output_items[0][: len(buffer)].max() <= 1, "invalid value"
  31. return len(buffer)