Browse Source

fix: close & unlock SPI device file when error occurs while entering context

Fabian Peter Hammerle 3 years ago
parent
commit
f72c934511
3 changed files with 42 additions and 13 deletions
  1. 2 0
      CHANGELOG.md
  2. 13 9
      cc1101/__init__.py
  3. 27 4
      tests/test_lock.py

+ 2 - 0
CHANGELOG.md

@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
 ## [Unreleased]
+### Fixed
+- close & unlock SPI device file when error occurs while entering context
 
 ## [2.7.0] - 2021-01-24
 ### Added

+ 13 - 9
cc1101/__init__.py

@@ -549,15 +549,19 @@ class CC1101:
             # advisory, exclusive, non-blocking
             # lock removed in __exit__ by SpiDev.close()
             fcntl.flock(self._spi.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
-        self._spi.max_speed_hz = 55700  # empirical
-        self._reset()
-        self._verify_chip()
-        self._configure_defaults()
-        marcstate = self.get_main_radio_control_state_machine_state()
-        if marcstate != MainRadioControlStateMachineState.IDLE:
-            raise ValueError(
-                "expected marcstate idle (actual: {})".format(marcstate.name)
-            )
+        try:
+            self._spi.max_speed_hz = 55700  # empirical
+            self._reset()
+            self._verify_chip()
+            self._configure_defaults()
+            marcstate = self.get_main_radio_control_state_machine_state()
+            if marcstate != MainRadioControlStateMachineState.IDLE:
+                raise ValueError(
+                    "expected marcstate idle (actual: {})".format(marcstate.name)
+                )
+        except:
+            self._spi.close()
+            raise
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):  # -> typing.Literal[False]

+ 27 - 4
tests/test_lock.py

@@ -139,11 +139,11 @@ def test_unlock_spi_device_no_lock(spidev_mock):
             transceiver.unlock_spi_device()
 
 
-def test_unlock_on_exception(spidev_mock):
+def test_unlock_on_exception_within_context(spidev_mock):
+    with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+        transceiver1 = cc1101.CC1101(lock_spi_device=True)
+        transceiver2 = cc1101.CC1101(lock_spi_device=True)
     with _mock_hardware_access():
-        with unittest.mock.patch("spidev.SpiDev", spidev_mock):
-            transceiver1 = cc1101.CC1101(lock_spi_device=True)
-            transceiver2 = cc1101.CC1101(lock_spi_device=True)
         with pytest.raises(ValueError, match=r"^test$"):
             with transceiver1:
                 raise ValueError("test")
@@ -158,3 +158,26 @@ def test_unlock_on_exception(spidev_mock):
             with pytest.raises(BlockingIOError):
                 with transceiver1:
                     pass
+
+
+def test_unlock_on_exception_when_entering_context(spidev_mock):
+    with unittest.mock.patch("spidev.SpiDev", spidev_mock):
+        transceiver1 = cc1101.CC1101(lock_spi_device=True)
+        transceiver2 = cc1101.CC1101(lock_spi_device=True)
+    with _mock_hardware_access():
+        with unittest.mock.patch.object(
+            cc1101.CC1101, "_verify_chip", side_effect=ValueError("test")
+        ), pytest.raises(ValueError, match=r"^test$"):
+            with transceiver1:
+                pass  # never reached
+        with transceiver2:
+            pass  # no BlockingIOError
+        # does locking still work after exception?
+        with transceiver1:
+            with pytest.raises(BlockingIOError):
+                with transceiver2:
+                    pass
+        with transceiver2:
+            with pytest.raises(BlockingIOError):
+                with transceiver1:
+                    pass