_base.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # switchbot-mqtt - MQTT client controlling SwitchBot button & curtain automators,
  2. # compatible with home-assistant.io's MQTT Switch & Cover platform
  3. #
  4. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. import abc
  19. import logging
  20. import queue
  21. import shlex
  22. import typing
  23. import bluepy.btle
  24. import paho.mqtt.client
  25. import switchbot
  26. from switchbot_mqtt._utils import (
  27. _join_mqtt_topic_levels,
  28. _mac_address_valid,
  29. _MQTTTopicLevel,
  30. _MQTTTopicPlaceholder,
  31. _QueueLogHandler,
  32. )
  33. _LOGGER = logging.getLogger(__name__)
  34. class _MQTTCallbackUserdata:
  35. # pylint: disable=too-few-public-methods; @dataclasses.dataclass when python_requires>=3.7
  36. def __init__(
  37. self,
  38. *,
  39. retry_count: int,
  40. device_passwords: typing.Dict[str, str],
  41. fetch_device_info: bool,
  42. ) -> None:
  43. self.retry_count = retry_count
  44. self.device_passwords = device_passwords
  45. self.fetch_device_info = fetch_device_info
  46. def __eq__(self, other: object) -> bool:
  47. return isinstance(other, type(self)) and vars(self) == vars(other)
  48. class _MQTTControlledActor(abc.ABC):
  49. MQTT_COMMAND_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  50. MQTT_STATE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  51. _MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
  52. @classmethod
  53. def get_mqtt_battery_percentage_topic(cls, mac_address: str) -> str:
  54. return _join_mqtt_topic_levels(
  55. topic_levels=cls._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
  56. mac_address=mac_address,
  57. )
  58. @abc.abstractmethod
  59. def __init__(
  60. self, *, mac_address: str, retry_count: int, password: typing.Optional[str]
  61. ) -> None:
  62. # alternative: pySwitchbot >=0.10.0 provides SwitchbotDevice.get_mac()
  63. self._mac_address = mac_address
  64. @abc.abstractmethod
  65. def _get_device(self) -> switchbot.SwitchbotDevice:
  66. raise NotImplementedError()
  67. def _update_device_info(self) -> None:
  68. log_queue: queue.Queue[logging.LogRecord] = queue.Queue(maxsize=0)
  69. logging.getLogger("switchbot").addHandler(_QueueLogHandler(log_queue))
  70. try:
  71. self._get_device().update()
  72. # pySwitchbot>=v0.10.1 catches bluepy.btle.BTLEManagementError :(
  73. # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.1/switchbot/__init__.py#L141
  74. while not log_queue.empty():
  75. log_record = log_queue.get()
  76. if log_record.exc_info:
  77. exc: typing.Optional[BaseException] = log_record.exc_info[1]
  78. if (
  79. isinstance(exc, bluepy.btle.BTLEManagementError)
  80. and exc.emsg == "Permission Denied"
  81. ):
  82. raise exc
  83. except bluepy.btle.BTLEManagementError as exc:
  84. if (
  85. exc.emsg == "Permission Denied"
  86. and exc.message == "Failed to execute management command 'le on'"
  87. ):
  88. raise PermissionError(
  89. "bluepy-helper failed to enable low energy mode"
  90. " due to insufficient permissions."
  91. "\nSee https://github.com/IanHarvey/bluepy/issues/313#issuecomment-428324639"
  92. ", https://github.com/fphammerle/switchbot-mqtt/pull/31#issuecomment-846383603"
  93. ", and https://github.com/IanHarvey/bluepy/blob/v/1.3.0/bluepy"
  94. "/bluepy-helper.c#L1260."
  95. "\nInsecure workaround:"
  96. "\n1. sudo apt-get install --no-install-recommends libcap2-bin"
  97. f"\n2. sudo setcap cap_net_admin+ep {shlex.quote(bluepy.btle.helperExe)}"
  98. "\n3. restart switchbot-mqtt"
  99. "\nIn docker-based setups, you could use"
  100. " `sudo docker run --cap-drop ALL --cap-add NET_ADMIN --user 0 …`"
  101. " (seriously insecure)."
  102. ) from exc
  103. raise
  104. def _report_battery_level(self, mqtt_client: paho.mqtt.client.Client) -> None:
  105. # > battery: Percentage of battery that is left.
  106. # https://www.home-assistant.io/integrations/sensor/#device-class
  107. self._mqtt_publish(
  108. topic_levels=self._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
  109. payload=str(self._get_device().get_battery_percent()).encode(),
  110. mqtt_client=mqtt_client,
  111. )
  112. def _update_and_report_device_info(
  113. self, mqtt_client: paho.mqtt.client.Client
  114. ) -> None:
  115. self._update_device_info()
  116. self._report_battery_level(mqtt_client=mqtt_client)
  117. @abc.abstractmethod
  118. def execute_command(
  119. self,
  120. mqtt_message_payload: bytes,
  121. mqtt_client: paho.mqtt.client.Client,
  122. update_device_info: bool,
  123. ) -> None:
  124. raise NotImplementedError()
  125. @classmethod
  126. def _mqtt_command_callback(
  127. cls,
  128. mqtt_client: paho.mqtt.client.Client,
  129. userdata: _MQTTCallbackUserdata,
  130. message: paho.mqtt.client.MQTTMessage,
  131. ) -> None:
  132. # pylint: disable=unused-argument; callback
  133. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  134. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  135. if message.retain:
  136. _LOGGER.info("ignoring retained message")
  137. return
  138. topic_split = message.topic.split("/")
  139. if len(topic_split) != len(cls.MQTT_COMMAND_TOPIC_LEVELS):
  140. _LOGGER.warning("unexpected topic %s", message.topic)
  141. return
  142. mac_address = None
  143. for given_part, expected_part in zip(
  144. topic_split, cls.MQTT_COMMAND_TOPIC_LEVELS
  145. ):
  146. if expected_part == _MQTTTopicPlaceholder.MAC_ADDRESS:
  147. mac_address = given_part
  148. elif expected_part != given_part:
  149. _LOGGER.warning("unexpected topic %s", message.topic)
  150. return
  151. assert mac_address
  152. if not _mac_address_valid(mac_address):
  153. _LOGGER.warning("invalid mac address %s", mac_address)
  154. return
  155. actor = cls(
  156. mac_address=mac_address,
  157. retry_count=userdata.retry_count,
  158. password=userdata.device_passwords.get(mac_address, None),
  159. )
  160. actor.execute_command(
  161. mqtt_message_payload=message.payload,
  162. mqtt_client=mqtt_client,
  163. # consider calling update+report method directly when adding support for battery levels
  164. update_device_info=userdata.fetch_device_info,
  165. )
  166. @classmethod
  167. def mqtt_subscribe(cls, mqtt_client: paho.mqtt.client.Client) -> None:
  168. command_topic = "/".join(
  169. "+" if isinstance(l, _MQTTTopicPlaceholder) else l
  170. for l in cls.MQTT_COMMAND_TOPIC_LEVELS
  171. )
  172. _LOGGER.info("subscribing to MQTT topic %r", command_topic)
  173. mqtt_client.subscribe(command_topic)
  174. mqtt_client.message_callback_add(
  175. sub=command_topic,
  176. callback=cls._mqtt_command_callback,
  177. )
  178. def _mqtt_publish(
  179. self,
  180. *,
  181. topic_levels: typing.List[_MQTTTopicLevel],
  182. payload: bytes,
  183. mqtt_client: paho.mqtt.client.Client,
  184. ) -> None:
  185. topic = _join_mqtt_topic_levels(
  186. topic_levels=topic_levels, mac_address=self._mac_address
  187. )
  188. # https://pypi.org/project/paho-mqtt/#publishing
  189. _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)
  190. message_info: paho.mqtt.client.MQTTMessageInfo = mqtt_client.publish(
  191. topic=topic, payload=payload, retain=True
  192. )
  193. # wait before checking status?
  194. if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
  195. _LOGGER.error(
  196. "Failed to publish MQTT message on topic %s (rc=%d)",
  197. topic,
  198. message_info.rc,
  199. )
  200. def report_state(self, state: bytes, mqtt_client: paho.mqtt.client.Client) -> None:
  201. self._mqtt_publish(
  202. topic_levels=self.MQTT_STATE_TOPIC_LEVELS,
  203. payload=state,
  204. mqtt_client=mqtt_client,
  205. )