123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- options:
- parameters:
- author: ''
- category: '[GRC Hier Blocks]'
- cmake_opt: ''
- comment: ''
- copyright: ''
- description: ''
- gen_cmake: 'On'
- gen_linking: dynamic
- generate_options: qt_gui
- hier_block_src_path: '.:'
- id: transmit
- max_nouts: '0'
- output_language: python
- placement: (0,0)
- qt_qss_theme: ''
- realtime_scheduling: ''
- run: 'True'
- run_command: '{python} -u {filename}'
- run_options: prompt
- sizing_mode: fixed
- thread_safe_setters: ''
- title: Arduino Serial Transmitter
- window_size: ''
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [70, 37]
- rotation: 0
- state: enabled
- blocks:
- - name: sample_rate_hz
- id: variable
- parameters:
- comment: ''
- value: '1195'
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [670, 313]
- rotation: 0
- state: enabled
- - name: blocks_throttle_0
- id: blocks_throttle
- parameters:
- affinity: ''
- alias: ''
- comment: ''
- ignoretag: 'True'
- maxoutbuf: '0'
- minoutbuf: '0'
- samples_per_second: sample_rate_hz
- type: byte
- vlen: '1'
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [649, 177]
- rotation: 0
- state: bypassed
- - name: message_generator_block
- id: epy_block
- parameters:
- _source_code: "import json\nimport pathlib\n\nimport gnuradio.gr\nimport numpy\n\
- \n\nclass MessageGeneratorBlock(gnuradio.gr.basic_block):\n def __init__(self):\n\
- \ gnuradio.gr.basic_block.__init__(\n self, name=\"Message\
- \ Generator\", in_sig=None, out_sig=[numpy.uint8]\n )\n # __file__\
- \ is not available\n self._messages = json.loads(\n pathlib.Path().resolve().parent.joinpath(\"\
- messages.json\").read_text()\n )\n\n def general_work(self, input_items,\
- \ output_items) -> int:\n # pylint: disable=unused-argument\n \
- \ message = self._messages[\"ON0\"] + [0] * 128\n if len(output_items[0])\
- \ < len(message):\n print(\"output buffer too short\", len(output_items[0]))\n\
- \ output_items[0][:] = 0\n return len(output_items[0])\n\
- \ output_items[0][: len(message)] = message\n return len(message)\n"
- affinity: ''
- alias: ''
- comment: ''
- maxoutbuf: '0'
- minoutbuf: '0'
- states:
- _io_cache: ('Message Generator', 'MessageGeneratorBlock', [], [], [('0', 'byte',
- 1)], '', [])
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [286, 318]
- rotation: 0
- state: true
- - name: qtgui_time_raster_sink_x_0
- id: qtgui_time_raster_sink_x
- parameters:
- affinity: ''
- alias: ''
- alpha1: '1.0'
- alpha10: '1.0'
- alpha2: '1.0'
- alpha3: '1.0'
- alpha4: '1.0'
- alpha5: '1.0'
- alpha6: '1.0'
- alpha7: '1.0'
- alpha8: '1.0'
- alpha9: '1.0'
- axislabels: 'True'
- color1: '2'
- color10: '0'
- color2: '0'
- color3: '0'
- color4: '0'
- color5: '0'
- color6: '0'
- color7: '0'
- color8: '0'
- color9: '0'
- comment: ''
- grid: 'False'
- gui_hint: ''
- label1: ''
- label10: ''
- label2: ''
- label3: ''
- label4: ''
- label5: ''
- label6: ''
- label7: ''
- label8: ''
- label9: ''
- mult: '[]'
- name: '""'
- ncols: int(sample_rate_hz)
- nconnections: '1'
- nrows: '60'
- offset: '[]'
- samp_rate: sample_rate_hz
- type: byte
- update_time: '0.10'
- zmax: '1'
- zmin: '0'
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [1091, 145]
- rotation: 0
- state: true
- - name: serial_sink_block
- id: epy_block
- parameters:
- _source_code: "import ctypes\nimport ctypes.util\nimport time\n\nimport gnuradio.gr\n\
- import numpy\nimport serial\n\n_LIBC_NAME = ctypes.util.find_library(\"c\")\n\
- if not _LIBC_NAME:\n raise Exception(\"couldn't find libc\")\n_LIBC = ctypes.CDLL(_LIBC_NAME)\n\
- \n\nclass _Timespec(ctypes.Structure):\n # pylint: disable=too-few-public-methods\n\
- \ \"\"\"\n > struct timespec {\n > time_t tv_sec; /* seconds\
- \ */\n > long tv_nsec; /* nanoseconds */\n > };\n\n > The\
- \ value of the nanoseconds field must be in the range 0 to 999999999.\n \"\
- \"\"\n _fields_ = [(\"tv_sec\", ctypes.c_uint32), (\"tv_nsec\", ctypes.c_long)]\n\
- \n\n_NANOSLEEP = _LIBC.nanosleep\n_NANOSLEEP.argtypes = [ctypes.POINTER(_Timespec),\
- \ ctypes.POINTER(_Timespec)]\n_NANOSLEEP.restype = ctypes.c_int\n\n\nclass SerialSinkBlock(gnuradio.gr.basic_block):\n\
- \ _SLEEP_THRESHOLD_NS = 10000\n\n def __init__(self, port=\"/dev/ttyUSB0\"\
- , baud_rate=115200, sample_rate_hz=1000):\n gnuradio.gr.basic_block.__init__(\n\
- \ self, name=\"Serial Sink Block\", in_sig=[numpy.uint8], out_sig=None\n\
- \ )\n self._serial_port = serial.Serial(\n port=port,\
- \ baudrate=baud_rate, bytesize=serial.EIGHTBITS\n )\n self._tick_interval_ns\
- \ = int(1e9 / sample_rate_hz)\n # periodically return from .general_work()\
- \ to handle signals\n self._work_max_samples = max(int(sample_rate_hz\
- \ * 2), 1)\n self._next_tick_time_ns = None\n self._libc = ctypes.CDLL(ctypes.util.find_library(\"\
- c\"))\n\n def general_work(self, input_items, output_items) -> int:\n \
- \ # pylint: disable=unused-argument\n if not self._next_tick_time_ns:\n\
- \ self._next_tick_time_ns = time.perf_counter_ns()\n for byte\
- \ in input_items[0][: self._work_max_samples]:\n while (\n \
- \ self._next_tick_time_ns - time.perf_counter_ns()\n ) >\
- \ self._SLEEP_THRESHOLD_NS:\n _NANOSLEEP(\n \
- \ _Timespec(\n 0,\n # negative\
- \ values make nanosleep return -1\n min(\n \
- \ self._next_tick_time_ns\n - time.perf_counter_ns()\n\
- \ - self._SLEEP_THRESHOLD_NS,\n \
- \ 999999999,\n ),\n ),\n\
- \ None,\n )\n while (self._next_tick_time_ns\
- \ - time.perf_counter_ns()) > 0:\n pass\n self._serial_port.write(b\"\
- \\x01\" if byte else b\"\\0\")\n self._next_tick_time_ns += self._tick_interval_ns\n\
- \ self.consume(0, min(len(input_items[0]), self._work_max_samples))\n\
- \ return 0\n"
- affinity: ''
- alias: ''
- baud_rate: '115200'
- comment: ''
- maxoutbuf: '0'
- minoutbuf: '0'
- port: '"/dev/ttyUSB0"'
- sample_rate_hz: sample_rate_hz
- states:
- _io_cache: ('Serial Sink Block', 'SerialSinkBlock', [('port', "'/dev/ttyUSB0'"),
- ('baud_rate', '115200'), ('sample_rate_hz', '1000')], [('0', 'byte', 1)], [],
- '', [])
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [625, 417]
- rotation: 0
- state: enabled
- connections:
- - [blocks_throttle_0, '0', qtgui_time_raster_sink_x_0, '0']
- - [message_generator_block, '0', blocks_throttle_0, '0']
- - [message_generator_block, '0', serial_sink_block, '0']
- metadata:
- file_format: 1
|