epy_block_0.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import time
  2. import gnuradio
  3. import numpy
  4. import serial
  5. class ArduinoAnalogReadings(gnuradio.gr.sync_block):
  6. def __init__(
  7. self, port="/dev/ttyUSB0", baud_rate=115200, buffer_max_length=10, timeout=2
  8. ):
  9. gnuradio.gr.sync_block.__init__(
  10. self, name="Arduino Analog Readings", in_sig=None, out_sig=[numpy.int16]
  11. )
  12. self._serial_port = serial.Serial(
  13. port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS, timeout=timeout
  14. )
  15. self._buffer_max_length = buffer_max_length
  16. self._samples_counter = 0
  17. self._sample_rate_report_timestamp = None
  18. def work(self, input_items, output_items):
  19. # pylint: disable=unused-argument
  20. buffer = self._serial_port.read(
  21. 2 * min(len(output_items[0]), self._buffer_max_length)
  22. )
  23. assert len(buffer) % 2 == 0
  24. buffer_samples_count = len(buffer) // 2
  25. output_items[0][:buffer_samples_count] = numpy.frombuffer(buffer, dtype=">u2")
  26. assert output_items[0][:buffer_samples_count].max() < 1024, "lost sync?"
  27. self._samples_counter += buffer_samples_count
  28. current_timestamp = time.time()
  29. if not self._sample_rate_report_timestamp:
  30. self._sample_rate_report_timestamp = current_timestamp
  31. elif (current_timestamp - self._sample_rate_report_timestamp) > 8:
  32. sample_rate_hz = self._samples_counter / (
  33. current_timestamp - self._sample_rate_report_timestamp
  34. )
  35. print(f"{sample_rate_hz:.01f} Hz")
  36. self._samples_counter = 0
  37. self._sample_rate_report_timestamp = current_timestamp
  38. return buffer_samples_count