1234567891011121314151617181920212223242526272829303132333435363738 |
- import gnuradio
- import numpy
- import serial
- class SerialSourceBlock(gnuradio.gr.sync_block):
- def __init__(
- self,
- port="/dev/ttyUSB1",
- baud_rate=115200,
- buffer_max_length=10,
- timeout=2,
- invert=False,
- ):
- # pylint: disable=too-many-arguments
- gnuradio.gr.sync_block.__init__(
- self, name="Serial Source Block", in_sig=None, out_sig=[numpy.uint8]
- )
- self._serial_port = serial.Serial(
- port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout
- )
- # remove first bytes that may have unexpected values > 1, unclear why
- # seen [0b110000000, 0] and [0, 0b11111110] so far
- self._serial_port.read(2)
- self._buffer_max_length = buffer_max_length
- self._invert = invert
- def work(self, input_items, output_items):
- # pylint: disable=unused-argument
- buffer = self._serial_port.read(
- min(len(output_items[0]), self._buffer_max_length)
- )
- output_items[0][: len(buffer)] = numpy.frombuffer(buffer, dtype="u1")
- if output_items[0][: len(buffer)].max() > 1:
- raise ValueError(output_items[0][: len(buffer)])
- if self._invert:
- output_items[0] ^= 1
- return len(buffer)
|