serial_source_block.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import gnuradio
  2. import numpy
  3. import serial
  4. class SerialSourceBlock(gnuradio.gr.sync_block):
  5. def __init__(
  6. self,
  7. port="/dev/ttyUSB1",
  8. baud_rate=115200,
  9. buffer_max_length=10,
  10. timeout=2,
  11. invert=False,
  12. ):
  13. # pylint: disable=too-many-arguments
  14. gnuradio.gr.sync_block.__init__(
  15. self, name="Serial Source Block", in_sig=None, out_sig=[numpy.uint8]
  16. )
  17. self._serial_port = serial.Serial(
  18. port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout
  19. )
  20. # remove first bytes that may have unexpected values > 1, unclear why
  21. # seen [0b110000000, 0] and [0, 0b11111110] so far
  22. self._serial_port.read(2)
  23. self._buffer_max_length = buffer_max_length
  24. self._invert = invert
  25. def work(self, input_items, output_items):
  26. # pylint: disable=unused-argument
  27. buffer = self._serial_port.read(
  28. min(len(output_items[0]), self._buffer_max_length)
  29. )
  30. output_items[0][: len(buffer)] = numpy.frombuffer(buffer, dtype="u1")
  31. if output_items[0][: len(buffer)].max() > 1:
  32. raise ValueError(output_items[0][: len(buffer)])
  33. if self._invert:
  34. output_items[0] ^= 1
  35. return len(buffer)