options: parameters: author: '' category: '[GRC Hier Blocks]' cmake_opt: '' comment: '' copyright: '' description: '' gen_cmake: 'On' gen_linking: dynamic generate_options: qt_gui hier_block_src_path: '.:' id: transmit max_nouts: '0' output_language: python placement: (0,0) qt_qss_theme: '' realtime_scheduling: '' run: 'True' run_command: '{python} -u {filename}' run_options: prompt sizing_mode: fixed thread_safe_setters: '' title: Arduino Serial Transmitter window_size: '' states: bus_sink: false bus_source: false bus_structure: null coordinate: [70, 37] rotation: 0 state: enabled blocks: - name: sample_rate_hz id: variable parameters: comment: '' value: '1195' states: bus_sink: false bus_source: false bus_structure: null coordinate: [670, 313] rotation: 0 state: enabled - name: blocks_throttle_0 id: blocks_throttle parameters: affinity: '' alias: '' comment: '' ignoretag: 'True' maxoutbuf: '0' minoutbuf: '0' samples_per_second: sample_rate_hz type: byte vlen: '1' states: bus_sink: false bus_source: false bus_structure: null coordinate: [649, 177] rotation: 0 state: bypassed - name: message_generator_block id: epy_block parameters: _source_code: "import json\nimport pathlib\n\nimport gnuradio.gr\nimport numpy\n\ \n\nclass MessageGeneratorBlock(gnuradio.gr.basic_block):\n def __init__(self):\n\ \ gnuradio.gr.basic_block.__init__(\n self, name=\"Message\ \ Generator\", in_sig=None, out_sig=[numpy.uint8]\n )\n # __file__\ \ is not available\n self._messages = json.loads(\n pathlib.Path().resolve().parent.joinpath(\"\ messages.json\").read_text()\n )\n\n def general_work(self, input_items,\ \ output_items) -> int:\n # pylint: disable=unused-argument\n \ \ message = self._messages[\"ON0\"] + [0] * 128\n if len(output_items[0])\ \ < len(message):\n print(\"output buffer too short\", len(output_items[0]))\n\ \ output_items[0][:] = 0\n return len(output_items[0])\n\ \ output_items[0][: len(message)] = message\n return len(message)\n" affinity: '' alias: '' comment: '' maxoutbuf: '0' minoutbuf: '0' states: _io_cache: ('Message Generator', 'MessageGeneratorBlock', [], [], [('0', 'byte', 1)], '', []) bus_sink: false bus_source: false bus_structure: null coordinate: [286, 318] rotation: 0 state: true - name: qtgui_time_raster_sink_x_0 id: qtgui_time_raster_sink_x parameters: affinity: '' alias: '' alpha1: '1.0' alpha10: '1.0' alpha2: '1.0' alpha3: '1.0' alpha4: '1.0' alpha5: '1.0' alpha6: '1.0' alpha7: '1.0' alpha8: '1.0' alpha9: '1.0' axislabels: 'True' color1: '2' color10: '0' color2: '0' color3: '0' color4: '0' color5: '0' color6: '0' color7: '0' color8: '0' color9: '0' comment: '' grid: 'False' gui_hint: '' label1: '' label10: '' label2: '' label3: '' label4: '' label5: '' label6: '' label7: '' label8: '' label9: '' mult: '[]' name: '""' ncols: int(sample_rate_hz) nconnections: '1' nrows: '60' offset: '[]' samp_rate: sample_rate_hz type: byte update_time: '0.10' zmax: '1' zmin: '0' states: bus_sink: false bus_source: false bus_structure: null coordinate: [1091, 145] rotation: 0 state: true - name: serial_sink_block id: epy_block parameters: _source_code: "import ctypes\nimport ctypes.util\nimport time\n\nimport gnuradio.gr\n\ import numpy\nimport serial\n\n_LIBC_NAME = ctypes.util.find_library(\"c\")\n\ if not _LIBC_NAME:\n raise Exception(\"couldn't find libc\")\n_LIBC = ctypes.CDLL(_LIBC_NAME)\n\ \n\nclass _Timespec(ctypes.Structure):\n # pylint: disable=too-few-public-methods\n\ \ \"\"\"\n > struct timespec {\n > time_t tv_sec; /* seconds\ \ */\n > long tv_nsec; /* nanoseconds */\n > };\n\n > The\ \ value of the nanoseconds field must be in the range 0 to 999999999.\n \"\ \"\"\n _fields_ = [(\"tv_sec\", ctypes.c_uint32), (\"tv_nsec\", ctypes.c_long)]\n\ \n\n_NANOSLEEP = _LIBC.nanosleep\n_NANOSLEEP.argtypes = [ctypes.POINTER(_Timespec),\ \ ctypes.POINTER(_Timespec)]\n_NANOSLEEP.restype = ctypes.c_int\n\n\nclass SerialSinkBlock(gnuradio.gr.basic_block):\n\ \ _SLEEP_THRESHOLD_NS = 10000\n\n def __init__(self, port=\"/dev/ttyUSB0\"\ , baud_rate=115200, sample_rate_hz=1000):\n gnuradio.gr.basic_block.__init__(\n\ \ self, name=\"Serial Sink Block\", in_sig=[numpy.uint8], out_sig=None\n\ \ )\n self._serial_port = serial.Serial(\n port=port,\ \ baudrate=baud_rate, bytesize=serial.EIGHTBITS\n )\n self._tick_interval_ns\ \ = int(1e9 / sample_rate_hz)\n # periodically return from .general_work()\ \ to handle signals\n self._work_max_samples = max(int(sample_rate_hz\ \ * 2), 1)\n self._next_tick_time_ns = None\n self._libc = ctypes.CDLL(ctypes.util.find_library(\"\ c\"))\n\n def general_work(self, input_items, output_items) -> int:\n \ \ # pylint: disable=unused-argument\n if not self._next_tick_time_ns:\n\ \ self._next_tick_time_ns = time.perf_counter_ns()\n for byte\ \ in input_items[0][: self._work_max_samples]:\n while (\n \ \ self._next_tick_time_ns - time.perf_counter_ns()\n ) >\ \ self._SLEEP_THRESHOLD_NS:\n _NANOSLEEP(\n \ \ _Timespec(\n 0,\n # negative\ \ values make nanosleep return -1\n min(\n \ \ self._next_tick_time_ns\n - time.perf_counter_ns()\n\ \ - self._SLEEP_THRESHOLD_NS,\n \ \ 999999999,\n ),\n ),\n\ \ None,\n )\n while (self._next_tick_time_ns\ \ - time.perf_counter_ns()) > 0:\n pass\n self._serial_port.write(b\"\ \\x01\" if byte else b\"\\0\")\n self._next_tick_time_ns += self._tick_interval_ns\n\ \ self.consume(0, min(len(input_items[0]), self._work_max_samples))\n\ \ return 0\n" affinity: '' alias: '' baud_rate: '115200' comment: '' maxoutbuf: '0' minoutbuf: '0' port: '"/dev/ttyUSB0"' sample_rate_hz: sample_rate_hz states: _io_cache: ('Serial Sink Block', 'SerialSinkBlock', [('port', "'/dev/ttyUSB0'"), ('baud_rate', '115200'), ('sample_rate_hz', '1000')], [('0', 'byte', 1)], [], '', []) bus_sink: false bus_source: false bus_structure: null coordinate: [625, 417] rotation: 0 state: enabled connections: - [blocks_throttle_0, '0', qtgui_time_raster_sink_x_0, '0'] - [message_generator_block, '0', blocks_throttle_0, '0'] - [message_generator_block, '0', serial_sink_block, '0'] metadata: file_format: 1