import ctypes import ctypes.util import time import gnuradio.gr import numpy import serial _LIBC_NAME = ctypes.util.find_library("c") if not _LIBC_NAME: raise Exception("couldn't find libc") _LIBC = ctypes.CDLL(_LIBC_NAME) class _Timespec(ctypes.Structure): # pylint: disable=too-few-public-methods """ > struct timespec { > time_t tv_sec; /* seconds */ > long tv_nsec; /* nanoseconds */ > }; > The value of the nanoseconds field must be in the range 0 to 999999999. """ _fields_ = [("tv_sec", ctypes.c_uint32), ("tv_nsec", ctypes.c_long)] _NANOSLEEP = _LIBC.nanosleep _NANOSLEEP.argtypes = [ctypes.POINTER(_Timespec), ctypes.POINTER(_Timespec)] _NANOSLEEP.restype = ctypes.c_int class SerialSinkBlock(gnuradio.gr.basic_block): _SLEEP_THRESHOLD_NS = 10000 def __init__(self, port="/dev/ttyUSB0", baud_rate=115200, sample_rate_hz=1000): gnuradio.gr.basic_block.__init__( self, name="Serial Sink Block", in_sig=[numpy.uint8], out_sig=None ) self._serial_port = serial.Serial( port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS ) self._tick_interval_ns = int(1e9 / sample_rate_hz) # periodically return from .general_work() to handle signals self._work_max_samples = max(int(sample_rate_hz * 2), 1) self._next_tick_time_ns = None self._libc = ctypes.CDLL(ctypes.util.find_library("c")) def general_work(self, input_items, output_items) -> int: # pylint: disable=unused-argument if not self._next_tick_time_ns: self._next_tick_time_ns = time.perf_counter_ns() for byte in input_items[0][: self._work_max_samples]: while ( self._next_tick_time_ns - time.perf_counter_ns() ) > self._SLEEP_THRESHOLD_NS: _NANOSLEEP( _Timespec( 0, # negative values make nanosleep return -1 min( self._next_tick_time_ns - time.perf_counter_ns() - self._SLEEP_THRESHOLD_NS, 999999999, ), ), None, ) while (self._next_tick_time_ns - time.perf_counter_ns()) > 0: pass self._serial_port.write(b"\x01" if byte else b"\0") self._next_tick_time_ns += self._tick_interval_ns self.consume(0, min(len(input_items[0]), self._work_max_samples)) return 0