123456789101112131415161718192021222324252627282930313233343536373839404142 |
- import time
- import gnuradio
- import numpy
- import serial
- class ArduinoAnalogReadings(gnuradio.gr.sync_block):
- def __init__(
- self, port="/dev/ttyUSB0", baud_rate=115200, buffer_max_length=10, timeout=2
- ):
- gnuradio.gr.sync_block.__init__(
- self, name="Arduino Analog Readings", in_sig=None, out_sig=[numpy.int16]
- )
- self._serial_port = serial.Serial(
- port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout
- )
- self._buffer_max_length = buffer_max_length
- self._samples_counter = 0
- self._sample_rate_report_timestamp = None
- def work(self, input_items, output_items):
- # pylint: disable=unused-argument
- buffer = self._serial_port.read(
- 2 * min(len(output_items[0]), self._buffer_max_length)
- )
- assert len(buffer) % 2 == 0
- buffer_samples_count = len(buffer) // 2
- output_items[0][:buffer_samples_count] = numpy.frombuffer(buffer, dtype=">u2")
- assert output_items[0][:buffer_samples_count].max() < 1024, "lost sync?"
- self._samples_counter += buffer_samples_count
- current_timestamp = time.time()
- if not self._sample_rate_report_timestamp:
- self._sample_rate_report_timestamp = current_timestamp
- elif (current_timestamp - self._sample_rate_report_timestamp) > 8:
- sample_rate_hz = self._samples_counter / (
- current_timestamp - self._sample_rate_report_timestamp
- )
- print(f"{sample_rate_hz:.01f} Hz")
- self._samples_counter = 0
- self._sample_rate_report_timestamp = current_timestamp
- return buffer_samples_count
|