import time import gnuradio import numpy import serial class ArduinoAnalogReadings(gnuradio.gr.sync_block): def __init__( self, port="/dev/ttyUSB0", baud_rate=115200, buffer_max_length=10, timeout=2 ): gnuradio.gr.sync_block.__init__( self, name="Arduino Analog Readings", in_sig=None, out_sig=[numpy.int16] ) self._serial_port = serial.Serial( port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout ) self._buffer_max_length = buffer_max_length self._samples_counter = 0 self._sample_rate_report_timestamp = None def work(self, input_items, output_items): # pylint: disable=unused-argument buffer = self._serial_port.read( 2 * min(len(output_items[0]), self._buffer_max_length) ) assert len(buffer) % 2 == 0 buffer_samples_count = len(buffer) // 2 output_items[0][:buffer_samples_count] = numpy.frombuffer(buffer, dtype=">u2") assert output_items[0][:buffer_samples_count].max() < 1024, "lost sync?" self._samples_counter += buffer_samples_count current_timestamp = time.time() if not self._sample_rate_report_timestamp: self._sample_rate_report_timestamp = current_timestamp elif (current_timestamp - self._sample_rate_report_timestamp) > 8: sample_rate_hz = self._samples_counter / ( current_timestamp - self._sample_rate_report_timestamp ) print(f"{sample_rate_hz:.01f} Hz") self._samples_counter = 0 self._sample_rate_report_timestamp = current_timestamp return buffer_samples_count