_gpio.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # python-cc1101 - Python Library to Transmit RF Signals via CC1101 Transceivers
  2. #
  3. # Copyright (C) 2021 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. from __future__ import annotations
  18. import ctypes
  19. import ctypes.util
  20. import datetime
  21. import errno
  22. import functools
  23. # could not find Debian's python3-libgpiod on pypi.org
  24. # https://salsa.debian.org/debian/libgpiod does not provide setup.py or setup.cfg
  25. @functools.lru_cache(maxsize=1)
  26. def _load_libgpiod() -> ctypes.CDLL:
  27. filename = ctypes.util.find_library("gpiod")
  28. if not filename:
  29. raise FileNotFoundError(
  30. "Failed to find libgpiod."
  31. "\nOn Debian-based systems, like Raspberry Pi OS / Raspbian,"
  32. " libgpiod can be installed via"
  33. "\n\tsudo apt-get install --no-install-recommends libgpiod2"
  34. )
  35. return ctypes.CDLL(filename, use_errno=True)
  36. class _c_timespec(ctypes.Structure):
  37. """
  38. struct timespec {
  39. time_t tv_sec;
  40. long tv_nsec;
  41. };
  42. """
  43. # pylint: disable=too-few-public-methods,invalid-name; struct
  44. _fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)]
  45. class GPIOLine:
  46. def __init__(self, pointer: ctypes.c_void_p) -> None:
  47. assert pointer != 0
  48. self._pointer = pointer
  49. @classmethod
  50. def find(cls, name: bytes) -> GPIOLine:
  51. # > If this routine succeeds, the user must manually close the GPIO chip
  52. # > owning this line to avoid memory leaks.
  53. pointer: int = _load_libgpiod().gpiod_line_find(name)
  54. # > If the line could not be found, this functions sets errno to ENOENT.
  55. if pointer == 0:
  56. err = ctypes.get_errno()
  57. if err == errno.EACCES:
  58. # > [PermissionError] corresponds to errno EACCES and EPERM.
  59. raise PermissionError(
  60. f"Failed to access GPIO line {name.decode()!r}."
  61. "\nVerify that the current user has read and write access for /dev/gpiochip*."
  62. "\nOn some systems, like Raspberry Pi OS / Raspbian,"
  63. "\n\tsudo usermod -a -G gpio $USER"
  64. "\nfollowed by a re-login grants sufficient permissions."
  65. )
  66. if err == errno.ENOENT:
  67. # > [FileNotFoundError] corresponds to errno ENOENT.
  68. # https://docs.python.org/3/library/exceptions.html#FileNotFoundError
  69. raise FileNotFoundError(
  70. f"GPIO line {name.decode()!r} does not exist."
  71. "\nRun command `gpioinfo` to get a list of all available GPIO lines."
  72. )
  73. raise OSError(
  74. f"Failed to open GPIO line {name.decode()!r}: {errno.errorcode[err]}"
  75. )
  76. return cls(pointer=ctypes.c_void_p(pointer))
  77. def __del__(self):
  78. # > Close a GPIO chip owning this line and release all resources.
  79. # > After this function returns, the line must no longer be used.
  80. if self._pointer:
  81. _load_libgpiod().gpiod_line_close_chip(self._pointer)
  82. # might make debugging easier in case someone calls __del__ twice
  83. self._pointer = None
  84. def wait_for_rising_edge(
  85. self, *, consumer: bytes, timeout: datetime.timedelta
  86. ) -> bool:
  87. """
  88. Return True, if an event occured; False on timeout.
  89. """
  90. if (
  91. _load_libgpiod().gpiod_line_request_rising_edge_events(
  92. self._pointer, consumer
  93. )
  94. != 0
  95. ):
  96. err = ctypes.get_errno()
  97. raise OSError(
  98. f"Request for rising edge event notifications failed ({errno.errorcode[err]})."
  99. + ("\nBlocked by another process?" if err == errno.EBUSY else "")
  100. )
  101. timeout_timespec = _c_timespec(
  102. int(timeout.total_seconds()), timeout.microseconds * 1000
  103. )
  104. result: int = _load_libgpiod().gpiod_line_event_wait(
  105. self._pointer, ctypes.pointer(timeout_timespec)
  106. )
  107. _load_libgpiod().gpiod_line_release(self._pointer)
  108. if result == -1:
  109. raise OSError("Failed to wait for rising edge event notification.")
  110. return result == 1