import gnuradio import numpy import serial class SerialSourceBlock(gnuradio.gr.sync_block): def __init__( self, port="/dev/ttyUSB0", baud_rate=115200, buffer_max_length=10, timeout=2, invert=False, ): # pylint: disable=too-many-arguments gnuradio.gr.sync_block.__init__( self, name="Serial Source Block", in_sig=None, out_sig=[numpy.uint8] ) self._serial_port = serial.Serial( port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout ) # remove first bytes that may have unexpected values > 1, unclear why # seen [0b110000000, 0] and [0, 0b11111110] so far self._serial_port.read(2) self._buffer_max_length = buffer_max_length self._invert = invert def work(self, input_items, output_items): # pylint: disable=unused-argument buffer = self._serial_port.read( min(len(output_items[0]), self._buffer_max_length) ) output_items[0][: len(buffer)] = numpy.frombuffer(buffer, dtype="u1") if output_items[0][: len(buffer)].max() > 1: raise ValueError(output_items[0][: len(buffer)]) if self._invert: output_items[0] ^= 1 return len(buffer)