import struct import time import gnuradio import numpy import serial class ArduinoAnalogReadings(gnuradio.gr.sync_block): def __init__(self, port="/dev/ttyUSB0", baud_rate=115200, timeout=2): gnuradio.gr.sync_block.__init__( self, name="Arduino Analog Readings", in_sig=[], out_sig=[numpy.int16] ) self._serial_port = serial.Serial(port, baud_rate, timeout=timeout) self._readings_counter = 0 self._start_timestamp = None def work(self, input_items, output_items): # pylint: disable=unused-argument if not self._start_timestamp: self._start_timestamp = time.time() output_items[0][0] = struct.unpack(">H", self._serial_port.read(2))[0] self._readings_counter += 1 if self._readings_counter % 1000 == 0: print(self._readings_counter / (time.time() - self._start_timestamp), "Hz") return 1