6 Commits 283dc66a0d ... 9137985007

Author SHA1 Message Date
  Fabian Peter Hammerle 9137985007 release v2.6.0 1 week ago
  Fabian Peter Hammerle 15e2949290 added command cc1101-export-config to export / inspect register values after configuration via args/options 1 week ago
  Fabian Peter Hammerle 855c8ddf8e refactor cli: split configuration into separate function to prepare for addition of entry points 1 week ago
  Fabian Peter Hammerle 873c98a2f9 cli: refactor to prepare for addition of entry points 1 week ago
  Fabian Peter Hammerle dff95838f7 cc1101-transmit: flock SPI device file to avoid race conditions with other processes 2 weeks ago
  Fabian Peter Hammerle 6908fb3511 transmit command: configure transceiver after reading from stdin to avoid delay between configuration and transmission, if pipe is slow 2 weeks ago
5 changed files with 210 additions and 30 deletions
  1. 14 1
      CHANGELOG.md
  2. 80 27
      cc1101/_cli.py
  3. 6 1
      setup.py
  4. 109 0
      tests/test_cli_export_config.py
  5. 1 1
      tests/test_cli_transmit.py

+ 14 - 1
CHANGELOG.md

@@ -6,6 +6,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+## [2.6.0] - 2021-01-04
+### Added
+- command `cc1101-export-config` to export / inspect config register values
+  after configuration via command-line arguments & options matching `cc1101-transmit`
+
+### Fixed
+- `cc1101-transmit` command:
+  - set `flock` on SPI device file to avoid race conditions,
+    if other processes attempt to use the same transceiver
+  - configure transceiver after reading from stdin
+    to avoid delay between configuration and transmission, if pipe is slow
+
 ## [2.5.0] - 2020-12-30
 ### Added
 - added command `cc1101-transmit` to transmit via command-line interface
@@ -68,7 +80,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - disabled data whitening
 - automatic calibration
 
-[Unreleased]: https://github.com/fphammerle/python-cc1101/compare/v2.5.0...HEAD
+[Unreleased]: https://github.com/fphammerle/python-cc1101/compare/v2.6.0...HEAD
+[2.6.0]: https://github.com/fphammerle/python-cc1101/compare/v2.5.0...v2.6.0
 [2.5.0]: https://github.com/fphammerle/python-cc1101/compare/v2.4.0...v2.5.0
 [2.4.0]: https://github.com/fphammerle/python-cc1101/compare/v2.3.0...v2.4.0
 [2.3.0]: https://github.com/fphammerle/python-cc1101/compare/v2.2.0...v2.3.0

+ 80 - 27
cc1101/_cli.py

@@ -18,6 +18,7 @@
 import argparse
 import logging
 import sys
+import typing
 
 import cc1101
 import cc1101.options
@@ -25,12 +26,7 @@ import cc1101.options
 _LOGGER = logging.getLogger(__name__)
 
 
-def _transmit():
-    argparser = argparse.ArgumentParser(
-        description="Transmits the payload provided via standard input (stdin)"
-        " OOK-modulated in big-endian bit order.",
-        allow_abbrev=False,
-    )
+def _add_common_args(argparser: argparse.ArgumentParser) -> None:
     argparser.add_argument("-f", "--base-frequency-hertz", type=int)
     argparser.add_argument("-r", "--symbol-rate-baud", type=int)
     argparser.add_argument(
@@ -47,7 +43,9 @@ def _transmit():
     )
     argparser.add_argument("--disable-checksum", action="store_true")
     argparser.add_argument("-d", "--debug", action="store_true")
-    args = argparser.parse_args()
+
+
+def _init_logging(args: argparse.Namespace) -> None:
     logging.basicConfig(
         level=logging.DEBUG if args.debug else logging.INFO,
         format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
@@ -55,27 +53,82 @@ def _transmit():
         else "%(message)s",
         datefmt="%Y-%m-%dT%H:%M:%S%z",
     )
+
+
+def _configure_via_args(
+    transceiver: cc1101.CC1101,
+    args: argparse.Namespace,
+    packet_length_if_fixed: typing.Optional[int],
+) -> None:
+    if args.base_frequency_hertz:
+        transceiver.set_base_frequency_hertz(args.base_frequency_hertz)
+    if args.symbol_rate_baud:
+        transceiver.set_symbol_rate_baud(args.symbol_rate_baud)
+    if args.sync_mode:
+        transceiver.set_sync_mode(
+            cc1101.options.SyncMode[args.sync_mode.upper().replace("-", "_")]
+        )
+    if args.packet_length_mode:
+        packet_length_mode = cc1101.options.PacketLengthMode[
+            args.packet_length_mode.upper()
+        ]
+        # default: variable length
+        transceiver.set_packet_length_mode(packet_length_mode)
+        # default: 255 (maximum)
+        if (
+            packet_length_if_fixed is not None
+            and packet_length_mode == cc1101.options.PacketLengthMode.FIXED
+        ):
+            transceiver.set_packet_length_bytes(packet_length_if_fixed)
+    if args.disable_checksum:
+        transceiver.disable_checksum()
+
+
+def _export_config():
+    argparser = argparse.ArgumentParser(
+        description="Export values in CC1101's configuration registers"
+        " after applying settings specified via command-line arguments & options",
+        allow_abbrev=False,
+    )
+    _add_common_args(argparser)
+    argparser.add_argument("--format", choices=["python-list"], default="python-list")
+    args = argparser.parse_args()
+    _init_logging(args)
     _LOGGER.debug("args=%r", args)
-    with cc1101.CC1101() as transceiver:
-        if args.base_frequency_hertz:
-            transceiver.set_base_frequency_hertz(args.base_frequency_hertz)
-        if args.symbol_rate_baud:
-            transceiver.set_symbol_rate_baud(args.symbol_rate_baud)
-        if args.sync_mode:
-            transceiver.set_sync_mode(
-                cc1101.options.SyncMode[args.sync_mode.upper().replace("-", "_")]
+    with cc1101.CC1101(lock_spi_device=True) as transceiver:
+        _configure_via_args(
+            transceiver=transceiver, args=args, packet_length_if_fixed=None
+        )
+        _LOGGER.info("%s", transceiver)
+        print("[")
+        for register_index, (register, value) in enumerate(
+            transceiver.get_configuration_register_values().items()
+        ):
+            assert register_index == register.value
+            print(
+                "0b{value:08b}, # 0x{value:02x} {register_name}".format(
+                    value=value, register_name=register.name
+                )
             )
-        payload = sys.stdin.buffer.read()
-        if args.packet_length_mode:
-            packet_length_mode = cc1101.options.PacketLengthMode[
-                args.packet_length_mode.upper()
-            ]
-            # default: variable length
-            transceiver.set_packet_length_mode(packet_length_mode)
-            # default: 255 (maximum)
-            if packet_length_mode == cc1101.options.PacketLengthMode.FIXED:
-                transceiver.set_packet_length_bytes(len(payload))
-        if args.disable_checksum:
-            transceiver.disable_checksum()
+        print("]")
+
+
+def _transmit():
+    argparser = argparse.ArgumentParser(
+        description="Transmits the payload provided via standard input (stdin)"
+        " OOK-modulated in big-endian bit order.",
+        allow_abbrev=False,
+    )
+    _add_common_args(argparser)
+    args = argparser.parse_args()
+    _init_logging(args)
+    _LOGGER.debug("args=%r", args)
+    payload = sys.stdin.buffer.read()
+    # configure transceiver after reading from stdin
+    # to avoid delay between configuration and transmission if pipe is slow
+    with cc1101.CC1101(lock_spi_device=True) as transceiver:
+        _configure_via_args(
+            transceiver=transceiver, args=args, packet_length_if_fixed=len(payload)
+        )
         _LOGGER.info("%s", transceiver)
         transceiver.transmit(payload)

+ 6 - 1
setup.py

@@ -57,7 +57,12 @@ setuptools.setup(
         "Topic :: Home Automation",
         "Topic :: Communications",
     ],
-    entry_points={"console_scripts": ["cc1101-transmit = cc1101._cli:_transmit"]},
+    entry_points={
+        "console_scripts": [
+            "cc1101-export-config = cc1101._cli:_export_config",
+            "cc1101-transmit = cc1101._cli:_transmit",
+        ]
+    },
     # apt install python3-spidev
     # https://github.com/doceme/py-spidev
     install_requires=["spidev"],

+ 109 - 0
tests/test_cli_export_config.py

@@ -0,0 +1,109 @@
+# python-cc1101 - Python Library to Transmit RF Signals via C1101 Transceivers
+#
+# Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+import logging
+import unittest.mock
+
+import pytest
+
+import cc1101._cli
+from cc1101.options import PacketLengthMode
+
+# pylint: disable=protected-access
+
+
+@pytest.mark.parametrize(
+    ("args", "packet_length_mode", "checksum_disabled"),
+    (
+        ([""], None, False),
+        (["", "--packet-length-mode", "variable"], PacketLengthMode.VARIABLE, False),
+        (["", "--disable-checksum"], None, True),
+    ),
+)
+def test_configure_device(args, packet_length_mode, checksum_disabled):
+    # pylint: disable=too-many-arguments
+    with unittest.mock.patch("cc1101.CC1101") as transceiver_class_mock:
+        with unittest.mock.patch("sys.argv", args):
+            cc1101._cli._export_config()
+    transceiver_class_mock.assert_called_once_with(lock_spi_device=True)
+    transceiver_mock = transceiver_class_mock().__enter__()
+    if packet_length_mode is None:
+        transceiver_mock.__enter__().set_packet_length_mode.assert_not_called()
+    else:
+        transceiver_mock.set_packet_length_mode.assert_called_once_with(
+            packet_length_mode
+        )
+    transceiver_mock.set_packet_length_bytes.assert_not_called()
+    if checksum_disabled:
+        transceiver_mock.disable_checksum.assert_called_once_with()
+    else:
+        transceiver_mock.disable_checksum.assert_not_called()
+
+
+def test_export_python_list(capsys, caplog):
+    with unittest.mock.patch("cc1101.CC1101") as transceiver_mock:
+        transceiver_mock().__enter__().get_configuration_register_values.return_value = {
+            cc1101.addresses.ConfigurationRegisterAddress.IOCFG2: 0x29,
+            cc1101.addresses.ConfigurationRegisterAddress.IOCFG1: 0x2E,
+        }
+        with unittest.mock.patch("sys.argv", [""]):
+            with caplog.at_level(logging.INFO):
+                cc1101._cli._export_config()
+    assert caplog.record_tuples == [
+        ("cc1101._cli", 20, str(transceiver_mock().__enter__()))
+    ]
+    out, err = capsys.readouterr()
+    assert not err
+    assert out == "[\n0b00101001, # 0x29 IOCFG2\n0b00101110, # 0x2e IOCFG1\n]\n"
+
+
+@pytest.mark.parametrize(
+    ("args", "root_log_level", "log_format"),
+    (
+        ([], logging.INFO, "%(message)s"),
+        (
+            ["--debug"],
+            logging.DEBUG,
+            "%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:" + "%(message)s",
+        ),
+    ),
+)
+def test_root_log_level(args, root_log_level, log_format):
+    with unittest.mock.patch("cc1101.CC1101"), unittest.mock.patch(
+        "sys.argv", [""] + args
+    ), unittest.mock.patch("logging.basicConfig") as logging_basic_config_mock:
+        cc1101._cli._export_config()
+    assert logging_basic_config_mock.call_count == 1
+    assert logging_basic_config_mock.call_args[1]["level"] == root_log_level
+    assert logging_basic_config_mock.call_args[1]["format"] == log_format
+
+
+def test_logging(caplog):
+    with unittest.mock.patch("sys.argv", [""]), unittest.mock.patch(
+        "cc1101.CC1101"
+    ) as transceiver_mock, caplog.at_level(logging.DEBUG):
+        transceiver_mock().__enter__().__str__.return_value = "dummystr"
+        cc1101._cli._export_config()
+    assert caplog.record_tuples == [
+        (
+            "cc1101._cli",
+            logging.DEBUG,
+            "args=Namespace(base_frequency_hertz=None, debug=False, disable_checksum=False, "
+            "format='python-list', packet_length_mode=None, symbol_rate_baud=None, sync_mode=None)",
+        ),
+        ("cc1101._cli", logging.INFO, "dummystr"),
+    ]

+ 1 - 1
tests/test_cli_transmit.py

@@ -110,7 +110,7 @@ def test_configure_device(
         with unittest.mock.patch("sys.stdin", stdin_mock):
             with unittest.mock.patch("sys.argv", args):
                 cc1101._cli._transmit()
-    transceiver_mock.assert_called_once_with()
+    transceiver_mock.assert_called_once_with(lock_spi_device=True)
     if base_frequency_hertz is None:
         transceiver_mock().__enter__().set_base_frequency_hertz.assert_not_called()
     else: