epy_block_0.py 924 B

1234567891011121314151617181920212223242526
  1. import struct
  2. import time
  3. import gnuradio
  4. import numpy
  5. import serial
  6. class ArduinoAnalogReadings(gnuradio.gr.sync_block):
  7. def __init__(self, port="/dev/ttyUSB0", baud_rate=115200, timeout=2):
  8. gnuradio.gr.sync_block.__init__(
  9. self, name="Arduino Analog Readings", in_sig=[], out_sig=[numpy.int16]
  10. )
  11. self._serial_port = serial.Serial(port, baud_rate, timeout=timeout)
  12. self._readings_counter = 0
  13. self._start_timestamp = None
  14. def work(self, input_items, output_items):
  15. # pylint: disable=unused-argument
  16. if not self._start_timestamp:
  17. self._start_timestamp = time.time()
  18. output_items[0][0] = struct.unpack(">H", self._serial_port.read(2))[0]
  19. self._readings_counter += 1
  20. if self._readings_counter % 1000 == 0:
  21. print(self._readings_counter / (time.time() - self._start_timestamp), "Hz")
  22. return 1