1
0

_base.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. # switchbot-mqtt - MQTT client controlling SwitchBot button & curtain automators,
  2. # compatible with home-assistant.io's MQTT Switch & Cover platform
  3. #
  4. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. from __future__ import annotations # PEP563 (default in python>=3.10)
  19. import abc
  20. import collections.abc
  21. import dataclasses
  22. import logging
  23. import queue
  24. import shlex
  25. import typing
  26. import bluepy.btle
  27. import paho.mqtt.client
  28. import switchbot
  29. from switchbot_mqtt._utils import (
  30. _join_mqtt_topic_levels,
  31. _mac_address_valid,
  32. _MQTTTopicLevel,
  33. _MQTTTopicPlaceholder,
  34. _parse_mqtt_topic,
  35. _QueueLogHandler,
  36. )
  37. _LOGGER = logging.getLogger(__name__)
  38. @dataclasses.dataclass
  39. class _MQTTCallbackUserdata:
  40. retry_count: int
  41. device_passwords: typing.Dict[str, str]
  42. fetch_device_info: bool
  43. class _MQTTControlledActor(abc.ABC):
  44. MQTT_COMMAND_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  45. _MQTT_UPDATE_DEVICE_INFO_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  46. MQTT_STATE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  47. _MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  48. @classmethod
  49. def get_mqtt_update_device_info_topic(cls, mac_address: str) -> str:
  50. return _join_mqtt_topic_levels(
  51. topic_levels=cls._MQTT_UPDATE_DEVICE_INFO_TOPIC_LEVELS,
  52. mac_address=mac_address,
  53. )
  54. @classmethod
  55. def get_mqtt_battery_percentage_topic(cls, mac_address: str) -> str:
  56. return _join_mqtt_topic_levels(
  57. topic_levels=cls._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
  58. mac_address=mac_address,
  59. )
  60. @abc.abstractmethod
  61. def __init__(
  62. self, *, mac_address: str, retry_count: int, password: typing.Optional[str]
  63. ) -> None:
  64. # alternative: pySwitchbot >=0.10.0 provides SwitchbotDevice.get_mac()
  65. self._mac_address = mac_address
  66. @abc.abstractmethod
  67. def _get_device(self) -> switchbot.SwitchbotDevice:
  68. raise NotImplementedError()
  69. def _update_device_info(self) -> None:
  70. log_queue: queue.Queue[logging.LogRecord] = queue.Queue(maxsize=0)
  71. logging.getLogger("switchbot").addHandler(_QueueLogHandler(log_queue))
  72. try:
  73. self._get_device().update()
  74. # pySwitchbot>=v0.10.1 catches bluepy.btle.BTLEManagementError :(
  75. # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.1/switchbot/__init__.py#L141
  76. # pySwitchbot<0.11.0 WARNING, >=0.11.0 ERROR
  77. while not log_queue.empty():
  78. log_record = log_queue.get()
  79. if log_record.exc_info:
  80. exc: typing.Optional[BaseException] = log_record.exc_info[1]
  81. if (
  82. isinstance(exc, bluepy.btle.BTLEManagementError)
  83. and exc.emsg == "Permission Denied"
  84. ):
  85. raise exc
  86. except bluepy.btle.BTLEManagementError as exc:
  87. if (
  88. exc.emsg == "Permission Denied"
  89. and exc.message == "Failed to execute management command 'le on'"
  90. ):
  91. raise PermissionError(
  92. "bluepy-helper failed to enable low energy mode"
  93. " due to insufficient permissions."
  94. "\nSee https://github.com/IanHarvey/bluepy/issues/313#issuecomment-428324639"
  95. ", https://github.com/fphammerle/switchbot-mqtt/pull/31#issuecomment-846383603"
  96. ", and https://github.com/IanHarvey/bluepy/blob/v/1.3.0/bluepy"
  97. "/bluepy-helper.c#L1260."
  98. "\nInsecure workaround:"
  99. "\n1. sudo apt-get install --no-install-recommends libcap2-bin"
  100. f"\n2. sudo setcap cap_net_admin+ep {shlex.quote(bluepy.btle.helperExe)}"
  101. "\n3. restart switchbot-mqtt"
  102. "\nIn docker-based setups, you could use"
  103. " `sudo docker run --cap-drop ALL --cap-add NET_ADMIN --user 0 …`"
  104. " (seriously insecure)."
  105. ) from exc
  106. raise
  107. def _report_battery_level(self, mqtt_client: paho.mqtt.client.Client) -> None:
  108. # > battery: Percentage of battery that is left.
  109. # https://www.home-assistant.io/integrations/sensor/#device-class
  110. self._mqtt_publish(
  111. topic_levels=self._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
  112. payload=str(self._get_device().get_battery_percent()).encode(),
  113. mqtt_client=mqtt_client,
  114. )
  115. def _update_and_report_device_info(
  116. self, mqtt_client: paho.mqtt.client.Client
  117. ) -> None:
  118. self._update_device_info()
  119. self._report_battery_level(mqtt_client=mqtt_client)
  120. @classmethod
  121. def _init_from_topic(
  122. cls,
  123. userdata: _MQTTCallbackUserdata,
  124. topic: str,
  125. expected_topic_levels: collections.abc.Collection[_MQTTTopicLevel],
  126. ) -> typing.Optional[_MQTTControlledActor]:
  127. try:
  128. mac_address = _parse_mqtt_topic(
  129. topic=topic, expected_levels=expected_topic_levels
  130. )[_MQTTTopicPlaceholder.MAC_ADDRESS]
  131. except ValueError as exc:
  132. _LOGGER.warning(str(exc), exc_info=False)
  133. return None
  134. if not _mac_address_valid(mac_address):
  135. _LOGGER.warning("invalid mac address %s", mac_address)
  136. return None
  137. return cls(
  138. mac_address=mac_address,
  139. retry_count=userdata.retry_count,
  140. password=userdata.device_passwords.get(mac_address, None),
  141. )
  142. @classmethod
  143. def _mqtt_update_device_info_callback(
  144. cls,
  145. mqtt_client: paho.mqtt.client.Client,
  146. userdata: _MQTTCallbackUserdata,
  147. message: paho.mqtt.client.MQTTMessage,
  148. ) -> None:
  149. # pylint: disable=unused-argument; callback
  150. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  151. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  152. if message.retain:
  153. _LOGGER.info("ignoring retained message")
  154. return
  155. actor = cls._init_from_topic(
  156. userdata=userdata,
  157. topic=message.topic,
  158. expected_topic_levels=cls._MQTT_UPDATE_DEVICE_INFO_TOPIC_LEVELS,
  159. )
  160. if actor:
  161. # pylint: disable=protected-access; own instance
  162. actor._update_and_report_device_info(mqtt_client)
  163. @abc.abstractmethod
  164. def execute_command(
  165. self,
  166. mqtt_message_payload: bytes,
  167. mqtt_client: paho.mqtt.client.Client,
  168. update_device_info: bool,
  169. ) -> None:
  170. raise NotImplementedError()
  171. @classmethod
  172. def _mqtt_command_callback(
  173. cls,
  174. mqtt_client: paho.mqtt.client.Client,
  175. userdata: _MQTTCallbackUserdata,
  176. message: paho.mqtt.client.MQTTMessage,
  177. ) -> None:
  178. # pylint: disable=unused-argument; callback
  179. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  180. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  181. if message.retain:
  182. _LOGGER.info("ignoring retained message")
  183. return
  184. actor = cls._init_from_topic(
  185. userdata=userdata,
  186. topic=message.topic,
  187. expected_topic_levels=cls.MQTT_COMMAND_TOPIC_LEVELS,
  188. )
  189. if actor:
  190. actor.execute_command(
  191. mqtt_message_payload=message.payload,
  192. mqtt_client=mqtt_client,
  193. update_device_info=userdata.fetch_device_info,
  194. )
  195. @classmethod
  196. def _get_mqtt_message_callbacks(
  197. cls,
  198. *,
  199. enable_device_info_update_topic: bool,
  200. ) -> typing.Dict[typing.Tuple[_MQTTTopicLevel, ...], typing.Callable]:
  201. # returning dict because `paho.mqtt.client.Client.message_callback_add` overwrites
  202. # callbacks with same topic pattern
  203. # https://github.com/eclipse/paho.mqtt.python/blob/v1.6.1/src/paho/mqtt/client.py#L2304
  204. # https://github.com/eclipse/paho.mqtt.python/blob/v1.6.1/src/paho/mqtt/matcher.py#L19
  205. callbacks = {tuple(cls.MQTT_COMMAND_TOPIC_LEVELS): cls._mqtt_command_callback}
  206. if enable_device_info_update_topic:
  207. callbacks[
  208. tuple(cls._MQTT_UPDATE_DEVICE_INFO_TOPIC_LEVELS)
  209. ] = cls._mqtt_update_device_info_callback
  210. return callbacks
  211. @classmethod
  212. def mqtt_subscribe(
  213. cls,
  214. mqtt_client: paho.mqtt.client.Client,
  215. *,
  216. enable_device_info_update_topic: bool,
  217. ) -> None:
  218. for topic_levels, callback in cls._get_mqtt_message_callbacks(
  219. enable_device_info_update_topic=enable_device_info_update_topic
  220. ).items():
  221. topic = _join_mqtt_topic_levels(topic_levels, mac_address="+")
  222. _LOGGER.info("subscribing to MQTT topic %r", topic)
  223. mqtt_client.subscribe(topic)
  224. mqtt_client.message_callback_add(sub=topic, callback=callback)
  225. def _mqtt_publish(
  226. self,
  227. *,
  228. topic_levels: typing.List[_MQTTTopicLevel],
  229. payload: bytes,
  230. mqtt_client: paho.mqtt.client.Client,
  231. ) -> None:
  232. topic = _join_mqtt_topic_levels(
  233. topic_levels=topic_levels, mac_address=self._mac_address
  234. )
  235. # https://pypi.org/project/paho-mqtt/#publishing
  236. _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)
  237. message_info: paho.mqtt.client.MQTTMessageInfo = mqtt_client.publish(
  238. topic=topic, payload=payload, retain=True
  239. )
  240. # wait before checking status?
  241. if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
  242. _LOGGER.error(
  243. "Failed to publish MQTT message on topic %s (rc=%d)",
  244. topic,
  245. message_info.rc,
  246. )
  247. def report_state(self, state: bytes, mqtt_client: paho.mqtt.client.Client) -> None:
  248. self._mqtt_publish(
  249. topic_levels=self.MQTT_STATE_TOPIC_LEVELS,
  250. payload=state,
  251. mqtt_client=mqtt_client,
  252. )