123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import ctypes
- import ctypes.util
- import time
- import gnuradio.gr
- import numpy
- import serial
- _LIBC_NAME = ctypes.util.find_library("c")
- if not _LIBC_NAME:
- raise Exception("couldn't find libc")
- _LIBC = ctypes.CDLL(_LIBC_NAME)
- class _Timespec(ctypes.Structure):
- # pylint: disable=too-few-public-methods
- """
- > struct timespec {
- > time_t tv_sec; /* seconds */
- > long tv_nsec; /* nanoseconds */
- > };
- > The value of the nanoseconds field must be in the range 0 to 999999999.
- """
- _fields_ = [("tv_sec", ctypes.c_uint32), ("tv_nsec", ctypes.c_long)]
- _NANOSLEEP = _LIBC.nanosleep
- _NANOSLEEP.argtypes = [ctypes.POINTER(_Timespec), ctypes.POINTER(_Timespec)]
- _NANOSLEEP.restype = ctypes.c_int
- class SerialSinkBlock(gnuradio.gr.basic_block):
- _SLEEP_THRESHOLD_NS = 10000
- def __init__(self, port="/dev/ttyUSB0", baud_rate=115200, sample_rate_hz=1000):
- gnuradio.gr.basic_block.__init__(
- self, name="Serial Sink Block", in_sig=[numpy.uint8], out_sig=None
- )
- self._serial_port = serial.Serial(
- port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS
- )
- self._tick_interval_ns = int(1e9 / sample_rate_hz)
- # periodically return from .general_work() to handle signals
- self._work_max_samples = max(int(sample_rate_hz * 2), 1)
- self._next_tick_time_ns = None
- self._libc = ctypes.CDLL(ctypes.util.find_library("c"))
- def general_work(self, input_items, output_items) -> int:
- # pylint: disable=unused-argument
- if not self._next_tick_time_ns:
- self._next_tick_time_ns = time.perf_counter_ns()
- for byte in input_items[0][: self._work_max_samples]:
- while (
- self._next_tick_time_ns - time.perf_counter_ns()
- ) > self._SLEEP_THRESHOLD_NS:
- _NANOSLEEP(
- _Timespec(
- 0,
- # negative values make nanosleep return -1
- min(
- self._next_tick_time_ns
- - time.perf_counter_ns()
- - self._SLEEP_THRESHOLD_NS,
- 999999999,
- ),
- ),
- None,
- )
- while (self._next_tick_time_ns - time.perf_counter_ns()) > 0:
- pass
- self._serial_port.write(b"\x01" if byte else b"\0")
- self._next_tick_time_ns += self._tick_interval_ns
- self.consume(0, min(len(input_items[0]), self._work_max_samples))
- return 0
|