Browse Source

added method `.unlock_spi_device` to release the lock manually

Fabian Peter Hammerle 3 years ago
parent
commit
2d2c1ebaba
4 changed files with 101 additions and 16 deletions
  1. 1 1
      .github/workflows/python.yml
  2. 2 1
      CHANGELOG.md
  3. 41 0
      cc1101/__init__.py
  4. 57 14
      tests/test_lock.py

+ 1 - 1
.github/workflows/python.yml

@@ -50,7 +50,7 @@ jobs:
       env:
         PYTHON_VERSION: ${{ matrix.python-version }}
     - run: pipenv graph
-    - run: pipenv run pytest --cov=cc1101 --cov-report=term-missing --cov-fail-under=84
+    - run: pipenv run pytest --cov=cc1101 --cov-report=term-missing --cov-fail-under=85
     - run: pipenv run pylint --load-plugins=pylint_import_requirements cc1101
     # https://github.com/PyCQA/pylint/issues/352
     - run: pipenv run pylint tests/*

+ 2 - 1
CHANGELOG.md

@@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ## [Unreleased]
 ### Added
 - option `CC1101(lock_spi_device=True)` sets an advisory, exclusive,
-  non-blocking `flock` on `/dev/spidev?.?`
+  non-blocking `flock` on the SPI device file `/dev/spidev?.?`
+- method `.unlock_spi_device()` to unlock manually
 
 ## [2.2.0] - 2020-12-08
 ### Added

+ 41 - 0
cc1101/__init__.py

@@ -141,6 +141,24 @@ class CC1101:
     def __init__(
         self, spi_bus: int = 0, spi_chip_select: int = 0, lock_spi_device: bool = False
     ) -> None:
+        """
+        lock_spi_device:
+            When True, an advisory, exclusive lock will be set on the SPI device file
+            non-blockingly via flock upon entering the context.
+            If the SPI device file is already locked (e.g., by a different process),
+            a BlockingIOError will be raised.
+            The lock will be removed automatically, when leaving the context.
+            The lock can optionally be released earlier by calling .unlock_spi_device().
+            >>> transceiver = cc1101.CC1101(lock_spi_device=True)
+            >>> # not locked
+            >>> with transceiver:
+            >>>     # locked
+            >>> # lock removed
+            >>> with transceiver:
+            >>>     # locked
+            >>>     transceiver.unlock_spi_device()
+            >>>     # lock removed
+        """
         self._spi = spidev.SpiDev()
         self._spi_bus = int(spi_bus)
         # > The BCM2835 core common to all Raspberry Pi devices has 3 SPI Controllers:
@@ -505,6 +523,29 @@ class CC1101:
         self._spi.close()
         return False
 
+    def unlock_spi_device(self) -> None:
+        """
+        Manually release the lock set on the SPI device file.
+
+        Alternatively, the lock will be released automatically,
+        when leaving the context.
+
+        Method fails silently, if the SPI device file is not locked.
+
+        >>> transceiver = cc1101.CC1101(lock_spi_device=True)
+        >>> # not locked
+        >>> with transceiver:
+        >>>     # locked
+        >>> # lock removed
+        >>> with transceiver:
+        >>>     # locked
+        >>>     transceiver.unlock_spi_device()
+        >>>     # lock removed
+        """
+        fileno = self._spi.fileno()
+        if fileno != -1:
+            fcntl.flock(fileno, fcntl.LOCK_UN)
+
     def get_main_radio_control_state_machine_state(
         self,
     ) -> MainRadioControlStateMachineState:

+ 57 - 14
tests/test_lock.py

@@ -15,6 +15,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+import contextlib
 import unittest.mock
 
 import pytest
@@ -36,10 +37,12 @@ def spidev_mock(tmp_path):
             self._file = path.open("w+")
 
         def fileno(self):
-            return self._file.fileno()
+            # mimic behaviour of spidev.SpiDev.fileno()
+            return self._file.fileno() if self._file else -1
 
         def close(self):
             self._file.close()
+            self._file = None  # for fileno
 
     return _SpiDevMock
 
@@ -47,7 +50,8 @@ def spidev_mock(tmp_path):
 # pylint: disable=redefined-outer-name; using fixture
 
 
-def test___enter__locked(spidev_mock):
+@contextlib.contextmanager
+def _mock_hardware_access():
     with unittest.mock.patch.object(
         cc1101.CC1101, "_reset"
     ), unittest.mock.patch.object(
@@ -59,6 +63,11 @@ def test___enter__locked(spidev_mock):
         "get_main_radio_control_state_machine_state",
         return_value=cc1101.MainRadioControlStateMachineState.IDLE,
     ):
+        yield
+
+
+def test_context_lock(spidev_mock):
+    with _mock_hardware_access():
         with unittest.mock.patch("spidev.SpiDev", spidev_mock):
             transceiver = cc1101.CC1101(lock_spi_device=True)
         with transceiver:
@@ -75,18 +84,8 @@ def test___enter__locked(spidev_mock):
             pass
 
 
-def test___enter__not_locked(spidev_mock):
-    with unittest.mock.patch.object(
-        cc1101.CC1101, "_reset"
-    ), unittest.mock.patch.object(
-        cc1101.CC1101, "_verify_chip"
-    ), unittest.mock.patch.object(
-        cc1101.CC1101, "_configure_defaults"
-    ), unittest.mock.patch.object(
-        cc1101.CC1101,
-        "get_main_radio_control_state_machine_state",
-        return_value=cc1101.MainRadioControlStateMachineState.IDLE,
-    ):
+def test_context_no_lock(spidev_mock):
+    with _mock_hardware_access():
         with unittest.mock.patch("spidev.SpiDev", spidev_mock):
             transceiver = cc1101.CC1101(lock_spi_device=False)
         with transceiver:
@@ -94,3 +93,47 @@ def test___enter__not_locked(spidev_mock):
                 transceiver2 = cc1101.CC1101(lock_spi_device=True)
             with transceiver2:
                 pass
+
+
+def test_unlock_spi_device(spidev_mock):
+    with _mock_hardware_access():
+        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+            transceiver = cc1101.CC1101(lock_spi_device=True)
+            transceiver2 = cc1101.CC1101(lock_spi_device=True)
+        with transceiver:  # acquire lock
+            with pytest.raises(BlockingIOError):
+                with transceiver2:
+                    pass
+            transceiver.unlock_spi_device()
+            with transceiver2:
+                pass
+
+
+def test_unlock_spi_device_double(spidev_mock):
+    with _mock_hardware_access():
+        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+            transceiver = cc1101.CC1101(lock_spi_device=True)
+        # verify no error occurs
+        with transceiver:  # acquire lock
+            transceiver.unlock_spi_device()
+            transceiver.unlock_spi_device()
+
+
+def test_unlock_spi_device_outside_context(spidev_mock):
+    with _mock_hardware_access():
+        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+            transceiver = cc1101.CC1101(lock_spi_device=True)
+        # verify no error occurs
+        transceiver.unlock_spi_device()
+        with transceiver:  # acquire lock
+            pass
+        transceiver.unlock_spi_device()
+
+
+def test_unlock_spi_device_no_lock(spidev_mock):
+    with _mock_hardware_access():
+        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+            transceiver = cc1101.CC1101(lock_spi_device=False)
+        with transceiver:  # no lock acquired
+            # verify no error occurs
+            transceiver.unlock_spi_device()