1234567891011121314151617181920212223242526 |
- import struct
- import time
- import gnuradio
- import numpy
- import serial
- class ArduinoAnalogReadings(gnuradio.gr.sync_block):
- def __init__(self, port="/dev/ttyUSB0", baud_rate=115200, timeout=2):
- gnuradio.gr.sync_block.__init__(
- self, name="Arduino Analog Readings", in_sig=[], out_sig=[numpy.int16]
- )
- self._serial_port = serial.Serial(port, baud_rate, timeout=timeout)
- self._readings_counter = 0
- self._start_timestamp = None
- def work(self, input_items, output_items):
- # pylint: disable=unused-argument
- if not self._start_timestamp:
- self._start_timestamp = time.time()
- output_items[0][0] = struct.unpack(">H", self._serial_port.read(2))[0]
- self._readings_counter += 1
- if self._readings_counter % 1000 == 0:
- print(self._readings_counter / (time.time() - self._start_timestamp), "Hz")
- return 1
|