Browse Source

refactor _wait_for_packet & wait_for_rising_edge: specify timeout as datetime.timedelta instead of int

Fabian Peter Hammerle 3 years ago
parent
commit
48e30cbe84
4 changed files with 30 additions and 23 deletions
  1. 5 6
      cc1101/__init__.py
  2. 8 3
      cc1101/_gpio.py
  3. 12 8
      tests/test_gpio.py
  4. 5 6
      tests/test_receive.py

+ 5 - 6
cc1101/__init__.py

@@ -16,6 +16,7 @@
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 import contextlib
+import datetime
 import enum
 import fcntl
 import logging
@@ -968,7 +969,7 @@ class CC1101:
 
     def _wait_for_packet(  # unstable
         self,
-        timeout_seconds: int,
+        timeout: datetime.timedelta,
         gdo0_gpio_line_name: bytes = b"GPIO24",  # recommended in README.md
     ) -> typing.Optional[_ReceivedPacket]:
         """
@@ -977,13 +978,11 @@ class CC1101:
         # pylint: disable=protected-access
         gdo0 = cc1101._gpio.GPIOLine.find(name=gdo0_gpio_line_name)
         self._enable_receive_mode()
-        if not gdo0.wait_for_rising_edge(
-            consumer=b"CC1101:GDO0", timeout_seconds=timeout_seconds
-        ):
+        if not gdo0.wait_for_rising_edge(consumer=b"CC1101:GDO0", timeout=timeout):
             self._command_strobe(StrobeAddress.SIDLE)
             _LOGGER.debug(
-                "reached timeout of %d seconds while waiting for packet",
-                timeout_seconds,
+                "reached timeout of %.02f seconds while waiting for packet",
+                timeout.total_seconds(),
             )
             return None  # timeout
         return self._get_received_packet()

+ 8 - 3
cc1101/_gpio.py

@@ -17,6 +17,7 @@
 
 import ctypes
 import ctypes.util
+import datetime
 import errno
 import functools
 
@@ -95,7 +96,9 @@ class GPIOLine:
         # might make debugging easier in case someone calls __del__ twice
         self._pointer = None
 
-    def wait_for_rising_edge(self, consumer: bytes, timeout_seconds: int) -> bool:
+    def wait_for_rising_edge(
+        self, consumer: bytes, timeout: datetime.timedelta
+    ) -> bool:
         """
         Return True, if an event occured; False on timeout.
         """
@@ -112,9 +115,11 @@ class GPIOLine:
                 )
                 + ("\nBlocked by another process?" if err == errno.EBUSY else "")
             )
-        timeout = _c_timespec(timeout_seconds, 0)
+        timeout_timespec = _c_timespec(
+            int(timeout.total_seconds()), timeout.microseconds * 1000
+        )
         result = _load_libgpiod().gpiod_line_event_wait(
-            self._pointer, ctypes.pointer(timeout)
+            self._pointer, ctypes.pointer(timeout_timespec)
         )  # type: int
         _load_libgpiod().gpiod_line_release(self._pointer)
         if result == -1:

+ 12 - 8
tests/test_gpio.py

@@ -17,6 +17,7 @@
 
 import ctypes
 import ctypes.util
+import datetime
 import errno
 import re
 import unittest.mock
@@ -84,17 +85,17 @@ def test_line_release(libgpiod_mock):
 
 
 @pytest.mark.parametrize("consumer", (b"CC1101 GDO0", b"test"))
-@pytest.mark.parametrize("timeout_seconds", (21, 42))
+@pytest.mark.parametrize("timeout_seconds", (21.42, 0.1234))
 @pytest.mark.parametrize("reached_timeout", (False, True))
 def test_line_wait_for_rising_edge(
-    libgpiod_mock, consumer: bytes, timeout_seconds: int, reached_timeout: bool
+    libgpiod_mock, consumer: bytes, timeout_seconds: float, reached_timeout: bool
 ):
     pointer = ctypes.c_void_p(1234)
     line = cc1101._gpio.GPIOLine(pointer=pointer)
     libgpiod_mock.gpiod_line_request_rising_edge_events.return_value = 0
     libgpiod_mock.gpiod_line_event_wait.return_value = 0 if reached_timeout else 1
     event_occured = line.wait_for_rising_edge(
-        consumer=consumer, timeout_seconds=timeout_seconds
+        consumer=consumer, timeout=datetime.timedelta(seconds=timeout_seconds)
     )
     assert event_occured is not reached_timeout
     libgpiod_mock.gpiod_line_request_rising_edge_events.assert_called_once_with(
@@ -103,17 +104,18 @@ def test_line_wait_for_rising_edge(
     assert libgpiod_mock.gpiod_line_event_wait.call_count == 1
     wait_args, wait_kwargs = libgpiod_mock.gpiod_line_event_wait.call_args
     assert wait_args[0] == pointer
-    assert wait_args[1].contents.tv_nsec == 0
-    assert wait_args[1].contents.tv_sec == timeout_seconds
+    assert (
+        wait_args[1].contents.tv_sec + wait_args[1].contents.tv_nsec / 10 ** 9
+    ) == pytest.approx(timeout_seconds, abs=1e-10)
     assert not wait_args[2:]
     assert not wait_kwargs
     libgpiod_mock.gpiod_line_release.assert_called_once_with(pointer)
 
 
 @pytest.mark.parametrize("consumer", (b"CC1101 GDO0",))
-@pytest.mark.parametrize("timeout_seconds", (21,))
+@pytest.mark.parametrize("timeout_seconds", (21.42,))
 def test_line_wait_for_rising_edge_busy(
-    libgpiod_mock, consumer: bytes, timeout_seconds: int
+    libgpiod_mock, consumer: bytes, timeout_seconds: float
 ):
     pointer = ctypes.c_void_p(1234)
     line = cc1101._gpio.GPIOLine(pointer=pointer)
@@ -124,4 +126,6 @@ def test_line_wait_for_rising_edge_busy(
         match=r"^Request for rising edge event notifications failed \(EBUSY\)."
         r"\nBlocked by another process\?$",
     ):
-        line.wait_for_rising_edge(consumer=consumer, timeout_seconds=timeout_seconds)
+        line.wait_for_rising_edge(
+            consumer=consumer, timeout=datetime.timedelta(seconds=timeout_seconds)
+        )

+ 5 - 6
tests/test_receive.py

@@ -15,6 +15,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+import datetime
 import unittest.mock
 
 import pytest
@@ -49,10 +50,8 @@ def test__get_received_packet(transceiver, payload):
 
 @pytest.mark.parametrize("gdo0_gpio_line_name", (b"GPIO24", b"GPIO25"))
 @pytest.mark.parametrize("reached_timeout", (True, False))
-@pytest.mark.parametrize("timeout_seconds", (2,))
-def test__wait_for_packet(
-    transceiver, gdo0_gpio_line_name, timeout_seconds, reached_timeout
-):
+@pytest.mark.parametrize("timeout", (datetime.timedelta(seconds=4),))
+def test__wait_for_packet(transceiver, gdo0_gpio_line_name, timeout, reached_timeout):
     line_mock = unittest.mock.MagicMock()
     line_mock.wait_for_rising_edge.return_value = not reached_timeout
     with unittest.mock.patch(
@@ -65,13 +64,13 @@ def test__wait_for_packet(
         transceiver, "_command_strobe"
     ) as command_strobe_mock:
         packet = transceiver._wait_for_packet(
-            timeout_seconds=timeout_seconds,
+            timeout=timeout,
             gdo0_gpio_line_name=gdo0_gpio_line_name,
         )
     find_line_mock.assert_called_once_with(name=gdo0_gpio_line_name)
     enable_receive_mode_mock.assert_called_once_with()
     line_mock.wait_for_rising_edge.assert_called_once_with(
-        consumer=b"CC1101:GDO0", timeout_seconds=timeout_seconds
+        consumer=b"CC1101:GDO0", timeout=timeout
     )
     if reached_timeout:
         assert packet is None