Browse Source

constructor: added option `lock_spi_device` to set a `flock` on the selected spi device

Fabian Peter Hammerle 3 years ago
parent
commit
b41be144dc
3 changed files with 108 additions and 1 deletions
  1. 3 0
      CHANGELOG.md
  2. 9 1
      cc1101/__init__.py
  3. 96 0
      tests/test_lock.py

+ 3 - 0
CHANGELOG.md

@@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
 ## [Unreleased]
+### Added
+- option `CC1101(lock_spi_device=True)` sets an advisory, exclusive,
+  non-blocking `flock` on `/dev/spidev?.?`
 
 ## [2.2.0] - 2020-12-08
 ### Added

+ 9 - 1
cc1101/__init__.py

@@ -17,6 +17,7 @@
 
 import contextlib
 import enum
+import fcntl
 import logging
 import math
 import typing
@@ -137,7 +138,9 @@ class CC1101:
     # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
     _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
 
-    def __init__(self, spi_bus: int = 0, spi_chip_select: int = 0) -> None:
+    def __init__(
+        self, spi_bus: int = 0, spi_chip_select: int = 0, lock_spi_device: bool = False
+    ) -> None:
         self._spi = spidev.SpiDev()
         self._spi_bus = int(spi_bus)
         # > The BCM2835 core common to all Raspberry Pi devices has 3 SPI Controllers:
@@ -147,6 +150,7 @@ class CC1101:
         # https://github.com/raspberrypi/documentation/blob/d41d69f8efa3667b1a8b01a669238b8bd113edc1/hardware/raspberrypi/spi/README.md#hardware
         # https://www.raspberrypi.org/documentation/hardware/raspberrypi/spi/README.md
         self._spi_chip_select = int(spi_chip_select)
+        self._lock_spi_device = lock_spi_device
 
     @property
     def _spi_device_path(self) -> str:
@@ -483,6 +487,10 @@ class CC1101:
                 + "\n\tsudo usermod -a -G spi $USER"
                 + "\nfollowed by a re-login grants sufficient permissions."
             ) from exc
+        if self._lock_spi_device:
+            # advisory, exclusive, non-blocking
+            # lock removed in __exit__ by SpiDev.close()
+            fcntl.flock(self._spi.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
         self._spi.max_speed_hz = 55700  # empirical
         self._reset()
         self._verify_chip()

+ 96 - 0
tests/test_lock.py

@@ -0,0 +1,96 @@
+# python-cc1101 - Python Library to Transmit RF Signals via C1101 Transceivers
+#
+# Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+import unittest.mock
+
+import pytest
+
+import cc1101
+import cc1101.addresses
+
+# pylint: disable=protected-access
+
+
+@pytest.fixture(scope="function")
+def spidev_mock(tmp_path):
+    class _SpiDevMock:
+        def __init__(self):
+            self._file = None
+
+        def open(self, bus, select):
+            path = tmp_path.joinpath("spidev{}.{}~".format(bus, select))
+            self._file = path.open("w+")
+
+        def fileno(self):
+            return self._file.fileno()
+
+        def close(self):
+            self._file.close()
+
+    return _SpiDevMock
+
+
+# pylint: disable=redefined-outer-name; using fixture
+
+
+def test___enter__locked(spidev_mock):
+    with unittest.mock.patch.object(
+        cc1101.CC1101, "_reset"
+    ), unittest.mock.patch.object(
+        cc1101.CC1101, "_verify_chip"
+    ), unittest.mock.patch.object(
+        cc1101.CC1101, "_configure_defaults"
+    ), unittest.mock.patch.object(
+        cc1101.CC1101,
+        "get_main_radio_control_state_machine_state",
+        return_value=cc1101.MainRadioControlStateMachineState.IDLE,
+    ):
+        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+            transceiver = cc1101.CC1101(lock_spi_device=True)
+        with transceiver:
+            with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+                transceiver2 = cc1101.CC1101(lock_spi_device=True)
+            with pytest.raises(BlockingIOError):
+                with transceiver2:
+                    pass
+            with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+                transceiver3 = cc1101.CC1101(lock_spi_device=False)
+            with transceiver3:
+                pass
+        with transceiver2:  # closing unlocks
+            pass
+
+
+def test___enter__not_locked(spidev_mock):
+    with unittest.mock.patch.object(
+        cc1101.CC1101, "_reset"
+    ), unittest.mock.patch.object(
+        cc1101.CC1101, "_verify_chip"
+    ), unittest.mock.patch.object(
+        cc1101.CC1101, "_configure_defaults"
+    ), unittest.mock.patch.object(
+        cc1101.CC1101,
+        "get_main_radio_control_state_machine_state",
+        return_value=cc1101.MainRadioControlStateMachineState.IDLE,
+    ):
+        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+            transceiver = cc1101.CC1101(lock_spi_device=False)
+        with transceiver:
+            with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+                transceiver2 = cc1101.CC1101(lock_spi_device=True)
+            with transceiver2:
+                pass