serial_sink_block.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import ctypes
  2. import ctypes.util
  3. import time
  4. import gnuradio.gr
  5. import numpy
  6. import serial
  7. _LIBC_NAME = ctypes.util.find_library("c")
  8. if not _LIBC_NAME:
  9. raise Exception("couldn't find libc")
  10. _LIBC = ctypes.CDLL(_LIBC_NAME)
  11. class _Timespec(ctypes.Structure):
  12. # pylint: disable=too-few-public-methods
  13. """
  14. > struct timespec {
  15. > time_t tv_sec; /* seconds */
  16. > long tv_nsec; /* nanoseconds */
  17. > };
  18. > The value of the nanoseconds field must be in the range 0 to 999999999.
  19. """
  20. _fields_ = [("tv_sec", ctypes.c_uint32), ("tv_nsec", ctypes.c_long)]
  21. _NANOSLEEP = _LIBC.nanosleep
  22. _NANOSLEEP.argtypes = [ctypes.POINTER(_Timespec), ctypes.POINTER(_Timespec)]
  23. _NANOSLEEP.restype = ctypes.c_int
  24. class SerialSinkBlock(gnuradio.gr.basic_block):
  25. _SLEEP_THRESHOLD_NS = 10000
  26. def __init__(self, port="/dev/ttyUSB0", baud_rate=115200, sample_rate_hz=1000):
  27. gnuradio.gr.basic_block.__init__(
  28. self, name="Serial Sink Block", in_sig=[numpy.uint8], out_sig=None
  29. )
  30. self._serial_port = serial.Serial(
  31. port=port, baudrate=baud_rate, bytesize=serial.EIGHTBITS
  32. )
  33. self._tick_interval_ns = int(1e9 / sample_rate_hz)
  34. # periodically return from .general_work() to handle signals
  35. self._work_max_samples = max(int(sample_rate_hz * 2), 1)
  36. self._next_tick_time_ns = None
  37. self._libc = ctypes.CDLL(ctypes.util.find_library("c"))
  38. def general_work(self, input_items, output_items) -> int:
  39. # pylint: disable=unused-argument
  40. if not self._next_tick_time_ns:
  41. self._next_tick_time_ns = time.perf_counter_ns()
  42. for byte in input_items[0][: self._work_max_samples]:
  43. while (
  44. self._next_tick_time_ns - time.perf_counter_ns()
  45. ) > self._SLEEP_THRESHOLD_NS:
  46. _NANOSLEEP(
  47. _Timespec(
  48. 0,
  49. # negative values make nanosleep return -1
  50. min(
  51. self._next_tick_time_ns
  52. - time.perf_counter_ns()
  53. - self._SLEEP_THRESHOLD_NS,
  54. 999999999,
  55. ),
  56. ),
  57. None,
  58. )
  59. while (self._next_tick_time_ns - time.perf_counter_ns()) > 0:
  60. pass
  61. self._serial_port.write(b"\x01" if byte else b"\0")
  62. self._next_tick_time_ns += self._tick_interval_ns
  63. self.consume(0, min(len(input_items[0]), self._work_max_samples))
  64. return 0