Browse Source

replace hhk7734/python3-gpiod with direct calls to libgpiod

Fabian Peter Hammerle 3 years ago
parent
commit
6922ce120b
9 changed files with 220 additions and 191 deletions
  1. 0 3
      .github/workflows/python.yml
  2. 1 5
      Pipfile
  3. 1 8
      Pipfile.lock
  4. 8 18
      cc1101/__init__.py
  5. 98 36
      cc1101/_gpio.py
  6. 0 3
      setup.py
  7. 8 1
      tests/conftest.py
  8. 91 90
      tests/test_gpio.py
  9. 13 27
      tests/test_receive.py

+ 0 - 3
.github/workflows/python.yml

@@ -46,9 +46,6 @@ jobs:
     - uses: actions/setup-python@v1
       with:
         python-version: ${{ matrix.python-version }}
-    - run: sudo apt-get update
-    # https://github.com/hhk7734/python3-gpiod/blob/v1.2.1/py_src/gpiod/libgpiod/__init__.py#L54
-    - run: sudo apt-get install --yes --no-install-recommends libgpiod2
     - run: pip install --upgrade pipenv==2020.8.13
     - run: pipenv install --python "$PYTHON_VERSION" --deploy --dev
       env:

+ 1 - 5
Pipfile

@@ -12,11 +12,7 @@ cc1101 = {editable = true,path = "."}
 # > $ python3 -c 'import spidev; print(spidev.__version__)'
 # > 3.4
 # https://pypi.org/project/spidev/3.4/
-spidev = {version = "3.4"}
-# python<3.7 compatibility
-# https://github.com/hhk7734/python3-gpiod/commit/19023307eb5c3dca54fb9113af71e172c1058f0a#diff-cc49f982a99ed077a61688cc7e4c08c6d7058a729cc6fe28f520d9a69a3e752cR793
-# https://github.com/hhk7734/python3-gpiod/commit/e3c69a37a98783ec60194c7da32f7df57e421df4#diff-4411023b37c4dce2d8ffab26c4679ac8e0981a0f7f35364f4de8fbdfd87eec8fR24
-gpiod = "<1.2.2"
+spidev = "==3.4"
 
 [dev-packages]
 # black requires python>=3.6

+ 1 - 8
Pipfile.lock

@@ -1,7 +1,7 @@
 {
     "_meta": {
         "hash": {
-            "sha256": "4e59af67df70f9d30795c6bc94668084e92b5ce31982ee3add26002e5f47bc72"
+            "sha256": "faa17db8123067958e66a14c65b1258ae6b92b58c606b1402341e864fb00ce6a"
         },
         "pipfile-spec": 6,
         "requires": {
@@ -20,13 +20,6 @@
             "editable": true,
             "path": "."
         },
-        "gpiod": {
-            "hashes": [
-                "sha256:667a156f707f60f3a68300c43b1392b6c8bf8104a9f42dcced2d9957c2a86dd0"
-            ],
-            "index": "pypi",
-            "version": "==1.2.1"
-        },
         "spidev": {
             "hashes": [
                 "sha256:4314e52f573d95233c907f307558893313a8a606e197e77bb711526b0e179e80"

+ 8 - 18
cc1101/__init__.py

@@ -16,7 +16,6 @@
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 import contextlib
-import datetime
 import enum
 import fcntl
 import logging
@@ -969,31 +968,22 @@ class CC1101:
 
     def _wait_for_packet(  # unstable
         self,
-        timeout: datetime.timedelta,
-        # https://github.com/hhk7734/python3-gpiod/blob/v1.5.0/py_src/gpiod/libgpiodcxx/__init__.py#L83
-        gdo0_chip: cc1101._gpio.ChipSelector = 0,
-        gdo0_line_name: str = "GPIO24",  # recommended in README.md
+        timeout_seconds: int,
+        gdo0_gpio_line_name: bytes = b"GPIO24",  # recommended in README.md
     ) -> typing.Optional[_ReceivedPacket]:
         """
         depends on IOCFG0 == 0b00000001 (see _configure_defaults)
         """
         # pylint: disable=protected-access
-        gdo0_line = cc1101._gpio.get_line(
-            chip_selector=gdo0_chip, line_name=gdo0_line_name
-        )
-        import gpiod  # pylint: disable=import-outside-toplevel; see get_line()
-
-        # https://github.com/hhk7734/python3-gpiod/blob/v1.2.1/py_src/gpiod/test/button.py#L33
-        gdo0_line_request = gpiod.line_request()
-        gdo0_line_request.consumer = "CC1101:GDO0"
-        gdo0_line_request.request_type = gpiod.line_request.EVENT_RISING_EDGE
-        gdo0_line.request(gdo0_line_request)
+        gdo0 = cc1101._gpio.GPIOLine.find(name=gdo0_gpio_line_name)
         self._enable_receive_mode()
-        if not gdo0_line.event_wait(timeout=timeout):
+        if not gdo0.wait_for_rising_edge(
+            consumer=b"CC1101:GDO0", timeout_seconds=timeout_seconds
+        ):
             self._command_strobe(StrobeAddress.SIDLE)
             _LOGGER.debug(
-                "reached timeout of %f seconds while waiting for packet",
-                timeout.total_seconds(),
+                "reached timeout of %d seconds while waiting for packet",
+                timeout_seconds,
             )
             return None  # timeout
         return self._get_received_packet()

+ 98 - 36
cc1101/_gpio.py

@@ -15,46 +15,108 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-import pathlib
-import typing
+import ctypes
+import ctypes.util
+import errno
+import functools
 
-ChipSelector = typing.Union[pathlib.Path, str, int]
+# could not find Debian's python3-libgpiod on pypi.org
+# https://salsa.debian.org/debian/libgpiod does not provide setup.py or setup.cfg
 
 
-def _format_chip_selector(selector: ChipSelector) -> str:
-    if isinstance(selector, int):
-        return "/dev/gpiochip{}".format(selector)
-    return str(selector)
+@functools.lru_cache(maxsize=1)
+def _load_libgpiod() -> ctypes.CDLL:
+    filename = ctypes.util.find_library("gpiod")
+    if not filename:
+        raise FileNotFoundError(
+            "Failed to find libgpiod."
+            "\nOn Debian-based systems, like Raspberry Pi OS / Raspbian,"
+            " libgpiod can be installed via"
+            "\n\tsudo apt-get install --no-install-recommends libgpiod2"
+        )
+    return ctypes.CDLL(filename, use_errno=True)
+
+
+class _c_timespec(ctypes.Structure):
+    """
+    struct timespec {
+        time_t tv_sec;
+        long tv_nsec;
+    };
+    """
+
+    # pylint: disable=too-few-public-methods,invalid-name; struct
+
+    _fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)]
 
 
-def get_line(chip_selector: ChipSelector, line_name: str) -> "gpiod.line":  # type: ignore
-    # lazy import to protect stable API against incompatilibities in hhk7734/python3-gpiod
-    # e.g., incompatibility of v1.5.0 with python3.5&3.6 (python_requires not set)
-    import gpiod  # pylint: disable=import-outside-toplevel
+class GPIOLine:
+    def __init__(self, pointer: ctypes.c_void_p) -> None:
+        assert pointer != 0
+        self._pointer = pointer
 
-    try:
-        chip = gpiod.chip(chip_selector)
-    except PermissionError as exc:
-        raise PermissionError(
-            "Failed to access GPIO chip {}.".format(
-                _format_chip_selector(chip_selector)
+    @classmethod
+    def find(cls, name: bytes) -> "GPIOLine":
+        # > If this routine succeeds, the user must manually close the GPIO chip
+        # > owning this line to avoid memory leaks.
+        pointer = _load_libgpiod().gpiod_line_find(name)  # type: int
+        # > If the line could not be found, this functions sets errno to ENOENT.
+        if pointer == 0:
+            err = ctypes.get_errno()
+            if err == errno.EACCES:
+                # > [PermissionError] corresponds to errno EACCES and EPERM.
+                raise PermissionError(
+                    "Failed to access GPIO line {!r}.".format(name.decode())
+                    + "\nVerify that the current user has read and write access for /dev/gpiochip*."
+                    + "\nOn some systems, like Raspberry Pi OS / Raspbian,"
+                    + "\n\tsudo usermod -a -G gpio $USER"
+                    + "\nfollowed by a re-login grants sufficient permissions."
+                )
+            if err == errno.ENOENT:
+                # > [FileNotFoundError] corresponds to errno ENOENT.
+                # https://docs.python.org/3/library/exceptions.html#FileNotFoundError
+                raise FileNotFoundError(
+                    "GPIO line {!r} does not exist.".format(name.decode())
+                    + "\nRun command `gpioinfo` to get a list of all available GPIO lines."
+                )
+            raise OSError(
+                "Failed to open GPIO line {!r}: {}".format(
+                    name.decode(),
+                    errno.errorcode[err],
+                )
             )
-            + "\nVerify that the current user has read and write access."
-            + "\nOn some systems, like Raspberry Pi OS / Raspbian,"
-            + "\n\tsudo usermod -a -G gpio $USER"
-            + "\nfollowed by a re-login grants sufficient permissions."
-        ) from exc
-    except (FileNotFoundError, OSError, TypeError) as exc:
-        raise FileNotFoundError(
-            "Failed to find GPIO chip {}.".format(_format_chip_selector(chip_selector))
-            + "\nRun command `gpiodetect` or `gpioinfo` to view available GPIO chips."
-        ) from exc
-    line = chip.find_line(name=line_name)
-    try:
-        line.name
-    except RuntimeError as exc:
-        raise ValueError(
-            "Failed to find GPIO line with name {!r}.".format(line_name)
-            + "\nRun command `gpioinfo` to view the names of all available GPIO lines."
-        ) from exc
-    return line
+        return cls(pointer=ctypes.c_void_p(pointer))
+
+    def __del__(self):
+        # > Close a GPIO chip owning this line and release all resources.
+        # > After this function returns, the line must no longer be used.
+        if self._pointer:
+            _load_libgpiod().gpiod_line_close_chip(self._pointer)
+        # might make debugging easier in case someone calls __del__ twice
+        self._pointer = None
+
+    def wait_for_rising_edge(self, consumer: bytes, timeout_seconds: int) -> bool:
+        """
+        Return True, if an event occured; False on timeout.
+        """
+        if (
+            _load_libgpiod().gpiod_line_request_rising_edge_events(
+                self._pointer, consumer
+            )
+            != 0
+        ):
+            err = ctypes.get_errno()
+            raise OSError(
+                "Request for rising edge event notifications failed ({}).".format(
+                    errno.errorcode[err]
+                )
+                + ("\nBlocked by another process?" if err == errno.EBUSY else "")
+            )
+        timeout = _c_timespec(timeout_seconds, 0)
+        result = _load_libgpiod().gpiod_line_event_wait(
+            self._pointer, ctypes.pointer(timeout)
+        )  # type: int
+        _load_libgpiod().gpiod_line_release(self._pointer)
+        if result == -1:
+            raise OSError("Failed to wait for rising edge event notification.")
+        return result == 1

+ 0 - 3
setup.py

@@ -65,9 +65,6 @@ setuptools.setup(
         ]
     },
     install_requires=[
-        # potential alternative: https://salsa.debian.org/debian/libgpiod (python3-libgpiod)
-        # https://github.com/hhk7734/python3-gpiod
-        "gpiod",
         # apt install python3-spidev
         # https://github.com/doceme/py-spidev
         "spidev",

+ 8 - 1
tests/conftest.py

@@ -5,7 +5,14 @@ import pytest
 import cc1101
 
 
-@pytest.fixture
+@pytest.fixture(scope="function")
 def transceiver():
     with unittest.mock.patch("spidev.SpiDev"):
         return cc1101.CC1101()
+
+
+@pytest.fixture(scope="function")
+def libgpiod_mock():
+    mock = unittest.mock.MagicMock()
+    with unittest.mock.patch("cc1101._gpio._load_libgpiod", return_value=mock):
+        yield mock

+ 91 - 90
tests/test_gpio.py

@@ -15,7 +15,9 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-import pathlib
+import ctypes
+import ctypes.util
+import errno
 import re
 import unittest.mock
 
@@ -26,101 +28,100 @@ import cc1101._gpio
 # pylint: disable=protected-access
 
 
-@pytest.mark.parametrize(
-    ("chip_selector", "chip_selector_formatted"),
-    (
-        (0, "/dev/gpiochip0"),
-        (1, "/dev/gpiochip1"),
-        ("/dev/gpiochip0", "/dev/gpiochip0"),
-        (pathlib.Path("/dev/gpiochip1"), "/dev/gpiochip1"),
-    ),
-)
-def test_get_line_permission_error(chip_selector, chip_selector_formatted):
+def test__load_libgpiod():
     with unittest.mock.patch(
-        "gpiod.chip",
-        side_effect=PermissionError("[Errno 13] Permission denied: '/dev/gpiochip0'"),
+        "ctypes.util.find_library", return_value=ctypes.util.find_library("c")
+    ) as find_library_mock:
+        assert isinstance(cc1101._gpio._load_libgpiod(), ctypes.CDLL)
+    find_library_mock.assert_called_once_with("gpiod")
+
+
+@pytest.mark.parametrize("name", ("GPIO24", "GPIO25"))
+def test_line_find_permission_denied(libgpiod_mock, name):
+    libgpiod_mock.gpiod_line_find.return_value = 0
+    ctypes.set_errno(errno.EACCES)
+    with pytest.raises(
+        PermissionError,
+        match=r"^Failed to access GPIO line {!r}\.\n".format(re.escape(name)),
     ):
-        with pytest.raises(
-            PermissionError,
-            match=r"^Failed to access GPIO chip {}\.".format(
-                re.escape(chip_selector_formatted)
-            ),
-        ):
-            cc1101._gpio.get_line(chip_selector=chip_selector, line_name="GPIO24")
+        cc1101._gpio.GPIOLine.find(name.encode())
 
 
-def test_get_line_file_not_found():
-    with unittest.mock.patch(
-        "gpiod.chip",
-        side_effect=FileNotFoundError(
-            "[Errno 2] No such file or directory: 'cannot open GPIO device /dev/gpiochip21'"
-        ),
+@pytest.mark.parametrize("name", ("GPIO24", "GPIO25"))
+def test_line_find_non_existing(libgpiod_mock, name):
+    libgpiod_mock.gpiod_line_find.return_value = 0
+    ctypes.set_errno(errno.ENOENT)
+    with pytest.raises(
+        FileNotFoundError,
+        match=r"^GPIO line {!r} does not exist\.\n".format(re.escape(name)),
     ):
-        with pytest.raises(
-            FileNotFoundError, match=r"^Failed to find GPIO chip /dev/gpiochip21\."
-        ):
-            cc1101._gpio.get_line(chip_selector=21, line_name="GPIO24")
+        cc1101._gpio.GPIOLine.find(name.encode())
 
 
-def test_get_line_cannot_open():
-    with unittest.mock.patch(
-        "gpiod.chip",
-        side_effect=OSError("[Errno 0] Success: 'cannot open GPIO device 42'"),
+@pytest.mark.parametrize("name", ("GPIO24", "GPIO25"))
+def test_line_find_unknown_error(libgpiod_mock, name):
+    libgpiod_mock.gpiod_line_find.return_value = 0
+    ctypes.set_errno(errno.ENOANO)
+    with pytest.raises(
+        OSError,
+        match=r"^Failed to open GPIO line {!r}: ENOANO$".format(re.escape(name)),
     ):
-        with pytest.raises(
-            FileNotFoundError, match=r"^Failed to find GPIO chip /dev/gpiochip42\."
-        ):
-            cc1101._gpio.get_line(chip_selector=42, line_name="GPIO24")
-
-
-def test_get_line_type_error():
-    with unittest.mock.patch(
-        "gpiod.chip",
-        side_effect=TypeError("iter() returned non-iterator of type 'NoneType'"),
+        cc1101._gpio.GPIOLine.find(name.encode())
+
+
+def test_line_find(libgpiod_mock):
+    libgpiod_mock.gpiod_line_find.return_value = 21
+    line = cc1101._gpio.GPIOLine.find(b"GPIO24")
+    libgpiod_mock.gpiod_line_find.assert_called_once_with(b"GPIO24")
+    assert isinstance(line, cc1101._gpio.GPIOLine)
+    assert line._pointer.value == 21
+
+
+def test_line_release(libgpiod_mock):
+    line = cc1101._gpio.GPIOLine(42)
+    del line
+    libgpiod_mock.gpiod_line_close_chip.assert_called_once_with(42)
+
+
+@pytest.mark.parametrize("consumer", (b"CC1101 GDO0", b"test"))
+@pytest.mark.parametrize("timeout_seconds", (21, 42))
+@pytest.mark.parametrize("reached_timeout", (False, True))
+def test_line_wait_for_rising_edge(
+    libgpiod_mock, consumer: bytes, timeout_seconds: int, reached_timeout: bool
+):
+    pointer = ctypes.c_void_p(1234)
+    line = cc1101._gpio.GPIOLine(pointer=pointer)
+    libgpiod_mock.gpiod_line_request_rising_edge_events.return_value = 0
+    libgpiod_mock.gpiod_line_event_wait.return_value = 0 if reached_timeout else 1
+    event_occured = line.wait_for_rising_edge(
+        consumer=consumer, timeout_seconds=timeout_seconds
+    )
+    assert event_occured is not reached_timeout
+    libgpiod_mock.gpiod_line_request_rising_edge_events.assert_called_once_with(
+        pointer, consumer
+    )
+    assert libgpiod_mock.gpiod_line_event_wait.call_count == 1
+    wait_args, wait_kwargs = libgpiod_mock.gpiod_line_event_wait.call_args
+    assert wait_args[0] == pointer
+    assert wait_args[1].contents.tv_nsec == 0
+    assert wait_args[1].contents.tv_sec == timeout_seconds
+    assert not wait_args[2:]
+    assert not wait_kwargs
+    libgpiod_mock.gpiod_line_release.assert_called_once_with(pointer)
+
+
+@pytest.mark.parametrize("consumer", (b"CC1101 GDO0",))
+@pytest.mark.parametrize("timeout_seconds", (21,))
+def test_line_wait_for_rising_edge_busy(
+    libgpiod_mock, consumer: bytes, timeout_seconds: int
+):
+    pointer = ctypes.c_void_p(1234)
+    line = cc1101._gpio.GPIOLine(pointer=pointer)
+    libgpiod_mock.gpiod_line_request_rising_edge_events.return_value = -1
+    ctypes.set_errno(errno.EBUSY)
+    with pytest.raises(
+        OSError,
+        match=r"^Request for rising edge event notifications failed \(EBUSY\)."
+        r"\nBlocked by another process\?$",
     ):
-        with pytest.raises(
-            FileNotFoundError, match=r"^Failed to find GPIO chip /dev/gpiochip815\."
-        ):
-            cc1101._gpio.get_line(chip_selector="/dev/gpiochip815", line_name="GPIO24")
-
-
-class _LineMock:
-
-    # pylint: disable=too-few-public-methods
-
-    def __init__(self, holding: bool):
-        self._holding = holding
-
-    @property
-    def name(self) -> str:
-        if not self._holding:
-            raise RuntimeError("object not holding a GPIO line handle")
-        return "dummy"
-
-
-@pytest.mark.parametrize("chip_selector", ("/dev/gpiochip0",))
-@pytest.mark.parametrize("line_name", ("GPIO24", "GPIO25"))
-def test_get_line(chip_selector, line_name):
-    with unittest.mock.patch("gpiod.chip") as chip_mock:
-        chip_mock().find_line.return_value = _LineMock(holding=True)
-        chip_mock.reset_mock()
-        assert isinstance(
-            cc1101._gpio.get_line(chip_selector=chip_selector, line_name=line_name),
-            _LineMock,
-        )
-        chip_mock.assert_called_once_with(chip_selector)
-        chip_mock().find_line.assert_called_once_with(name=line_name)
-
-
-@pytest.mark.parametrize("chip_selector", ("/dev/gpiochip0",))
-@pytest.mark.parametrize("line_name", ("GPIO24", "GPIO25"))
-def test_get_line_unknown_line(chip_selector, line_name):
-    with unittest.mock.patch("gpiod.chip") as chip_mock:
-        chip_mock().find_line.return_value = _LineMock(holding=False)
-        with pytest.raises(
-            ValueError,
-            match=r"Failed to find GPIO line with name {}\.".format(
-                re.escape(repr(line_name))
-            ),
-        ):
-            cc1101._gpio.get_line(chip_selector=chip_selector, line_name=line_name)
+        line.wait_for_rising_edge(consumer=consumer, timeout_seconds=timeout_seconds)

+ 13 - 27
tests/test_receive.py

@@ -15,10 +15,8 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-import datetime
 import unittest.mock
 
-import gpiod
 import pytest
 
 # pylint: disable=protected-access
@@ -49,44 +47,32 @@ def test__get_received_packet(transceiver, payload):
     assert received_packet.link_quality_indicator == 42
 
 
-@pytest.mark.parametrize("gdo0_chip_selector", (0, "/dev/gpiochip1"))
-@pytest.mark.parametrize("gdo0_line_name", ("GPIO24", "GPIO25"))
+@pytest.mark.parametrize("gdo0_gpio_line_name", (b"GPIO24", b"GPIO25"))
 @pytest.mark.parametrize("reached_timeout", (True, False))
-@pytest.mark.parametrize("timeout", (datetime.timedelta(seconds=1),))
+@pytest.mark.parametrize("timeout_seconds", (2,))
 def test__wait_for_packet(
-    transceiver, gdo0_chip_selector, gdo0_line_name, timeout, reached_timeout
+    transceiver, gdo0_gpio_line_name, timeout_seconds, reached_timeout
 ):
     line_mock = unittest.mock.MagicMock()
-    line_mock.event_wait.return_value = not reached_timeout
+    line_mock.wait_for_rising_edge.return_value = not reached_timeout
     with unittest.mock.patch(
-        "cc1101._gpio.get_line"
-    ) as get_line_mock, unittest.mock.patch.object(
-        transceiver, "_get_received_packet"
+        "cc1101._gpio.GPIOLine.find", return_value=line_mock
+    ) as find_line_mock, unittest.mock.patch.object(
+        transceiver, "_get_received_packet", return_value="packet-dummy"
     ) as get_received_packet_mock, unittest.mock.patch.object(
         transceiver, "_enable_receive_mode"
     ) as enable_receive_mode_mock, unittest.mock.patch.object(
         transceiver, "_command_strobe"
     ) as command_strobe_mock:
-        get_line_mock.return_value = line_mock
-        get_received_packet_mock.return_value = "packet-dummy"
         packet = transceiver._wait_for_packet(
-            timeout=timeout,
-            gdo0_chip=gdo0_chip_selector,
-            gdo0_line_name=gdo0_line_name,
+            timeout_seconds=timeout_seconds,
+            gdo0_gpio_line_name=gdo0_gpio_line_name,
         )
-    get_line_mock.assert_called_once_with(
-        chip_selector=gdo0_chip_selector, line_name=gdo0_line_name
-    )
-    assert line_mock.request.call_count == 1
-    (line_request,) = line_mock.request.call_args[0]
-    assert vars(line_request) == {
-        "consumer": "CC1101:GDO0",
-        "flags": 0,
-        "request_type": gpiod.line_request.EVENT_RISING_EDGE,
-    }
-    assert not line_mock.request.call_args[1]
+    find_line_mock.assert_called_once_with(name=gdo0_gpio_line_name)
     enable_receive_mode_mock.assert_called_once_with()
-    line_mock.event_wait.assert_called_once_with(timeout=timeout)
+    line_mock.wait_for_rising_edge.assert_called_once_with(
+        consumer=b"CC1101:GDO0", timeout_seconds=timeout_seconds
+    )
     if reached_timeout:
         assert packet is None
         command_strobe_mock.assert_called_once_with(0x36)  # SIDLE