__init__.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # switchbot-mqtt - MQTT client controlling SwitchBot button automators,
  2. # compatible with home-assistant.io's MQTT Switch platform
  3. #
  4. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. import argparse
  19. import enum
  20. import logging
  21. import pathlib
  22. import re
  23. import typing
  24. import paho.mqtt.client
  25. import switchbot
  26. _LOGGER = logging.getLogger(__name__)
  27. class _SwitchbotAction(enum.Enum):
  28. ON = 1
  29. OFF = 2
  30. class _SwitchbotState(enum.Enum):
  31. ON = 1
  32. OFF = 2
  33. # https://www.home-assistant.io/docs/mqtt/discovery/#switches
  34. _MQTT_TOPIC_PREFIX_LEVELS = ["homeassistant", "switch", "switchbot"]
  35. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER = "{mac_address}"
  36. _MQTT_SET_TOPIC_LEVELS = _MQTT_TOPIC_PREFIX_LEVELS + [
  37. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER,
  38. "set",
  39. ]
  40. _MQTT_SET_TOPIC = "/".join(_MQTT_SET_TOPIC_LEVELS).replace(
  41. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER, "+"
  42. )
  43. _MQTT_STATE_TOPIC = "/".join(
  44. _MQTT_TOPIC_PREFIX_LEVELS + [_MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER, "state"]
  45. )
  46. # https://www.home-assistant.io/integrations/switch.mqtt/#state_off
  47. _MQTT_STATE_PAYLOAD_MAPPING = {_SwitchbotState.ON: b"ON", _SwitchbotState.OFF: b"OFF"}
  48. _MAC_ADDRESS_REGEX = re.compile(r"^[0-9a-f]{2}(:[0-9a-f]{2}){5}$")
  49. def _mac_address_valid(mac_address: str) -> bool:
  50. return _MAC_ADDRESS_REGEX.match(mac_address.lower()) is not None
  51. def _mqtt_on_connect(
  52. mqtt_client: paho.mqtt.client.Client,
  53. user_data: typing.Any,
  54. flags: typing.Dict,
  55. return_code: int,
  56. ) -> None:
  57. # pylint: disable=unused-argument; callback
  58. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
  59. assert return_code == 0, return_code # connection accepted
  60. mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
  61. _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
  62. # https://www.home-assistant.io/docs/mqtt/discovery/#discovery_prefix
  63. mqtt_client.subscribe(_MQTT_SET_TOPIC)
  64. def _report_state(
  65. mqtt_client: paho.mqtt.client.Client,
  66. switchbot_mac_address: str,
  67. switchbot_state: _SwitchbotState,
  68. ) -> None:
  69. # https://pypi.org/project/paho-mqtt/#publishing
  70. topic = _MQTT_STATE_TOPIC.replace(
  71. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER, switchbot_mac_address
  72. )
  73. payload = _MQTT_STATE_PAYLOAD_MAPPING[switchbot_state]
  74. _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)
  75. message_info = mqtt_client.publish(
  76. topic=topic, payload=payload, retain=True
  77. ) # type: paho.mqtt.client.MQTTMessageInfo
  78. if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
  79. _LOGGER.error("failed to publish state (rc=%d)", message_info.rc)
  80. def _send_command(
  81. mqtt_client: paho.mqtt.client.Client,
  82. switchbot_mac_address: str,
  83. action: _SwitchbotAction,
  84. ) -> None:
  85. switchbot_device = switchbot.Switchbot(mac=switchbot_mac_address)
  86. # pySwitchbot catches & logs bluetooth exceptions (see `test__send_command_bluetooth_error`)
  87. if action == _SwitchbotAction.ON:
  88. if not switchbot_device.turn_on():
  89. _LOGGER.error("failed to turn on switchbot %s", switchbot_mac_address)
  90. else:
  91. _LOGGER.info("switchbot %s turned on", switchbot_mac_address)
  92. _report_state(
  93. mqtt_client=mqtt_client,
  94. switchbot_mac_address=switchbot_mac_address,
  95. switchbot_state=_SwitchbotState.ON,
  96. )
  97. else:
  98. assert action == _SwitchbotAction.OFF, action
  99. if not switchbot_device.turn_off():
  100. _LOGGER.error("failed to turn off switchbot %s", switchbot_mac_address)
  101. else:
  102. _LOGGER.info("switchbot %s turned off", switchbot_mac_address)
  103. _report_state(
  104. mqtt_client=mqtt_client,
  105. switchbot_mac_address=switchbot_mac_address,
  106. switchbot_state=_SwitchbotState.OFF,
  107. )
  108. def _mqtt_on_message(
  109. mqtt_client: paho.mqtt.client.Client,
  110. user_data: typing.Any,
  111. message: paho.mqtt.client.MQTTMessage,
  112. ) -> None:
  113. # pylint: disable=unused-argument; callback
  114. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  115. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  116. if message.retain:
  117. _LOGGER.info("ignoring retained message")
  118. return
  119. topic_split = message.topic.split("/")
  120. if len(topic_split) != len(_MQTT_SET_TOPIC_LEVELS):
  121. _LOGGER.warning("unexpected topic %s", message.topic)
  122. return
  123. switchbot_mac_address = None
  124. for given_part, expected_part in zip(topic_split, _MQTT_SET_TOPIC_LEVELS):
  125. if expected_part == _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER:
  126. switchbot_mac_address = given_part
  127. elif expected_part != given_part:
  128. _LOGGER.warning("unexpected topic %s", message.topic)
  129. return
  130. assert switchbot_mac_address
  131. if not _mac_address_valid(switchbot_mac_address):
  132. _LOGGER.warning("invalid mac address %s", switchbot_mac_address)
  133. return
  134. # https://www.home-assistant.io/integrations/switch.mqtt/#payload_off
  135. if message.payload.lower() == b"on":
  136. action = _SwitchbotAction.ON
  137. elif message.payload.lower() == b"off":
  138. action = _SwitchbotAction.OFF
  139. else:
  140. _LOGGER.warning("unexpected payload %r", message.payload)
  141. return
  142. _send_command(
  143. mqtt_client=mqtt_client,
  144. switchbot_mac_address=switchbot_mac_address,
  145. action=action,
  146. )
  147. def _run(
  148. mqtt_host: str,
  149. mqtt_port: int,
  150. mqtt_username: typing.Optional[str],
  151. mqtt_password: typing.Optional[str],
  152. ) -> None:
  153. # https://pypi.org/project/paho-mqtt/
  154. mqtt_client = paho.mqtt.client.Client()
  155. mqtt_client.on_connect = _mqtt_on_connect
  156. mqtt_client.on_message = _mqtt_on_message
  157. _LOGGER.info("connecting to MQTT broker %s:%d", mqtt_host, mqtt_port)
  158. if mqtt_username:
  159. mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
  160. elif mqtt_password:
  161. raise ValueError("Missing MQTT username")
  162. mqtt_client.connect(host=mqtt_host, port=mqtt_port)
  163. mqtt_client.loop_forever()
  164. def _main() -> None:
  165. logging.basicConfig(
  166. level=logging.DEBUG,
  167. format="%(asctime)s:%(levelname)s:%(message)s",
  168. datefmt="%Y-%m-%dT%H:%M:%S%z",
  169. )
  170. argparser = argparse.ArgumentParser(
  171. description="MQTT client controlling SwitchBot button automators, "
  172. "compatible with home-assistant.io's MQTT Switch platform"
  173. )
  174. argparser.add_argument("--mqtt-host", type=str, required=True)
  175. argparser.add_argument("--mqtt-port", type=int, default=1883)
  176. argparser.add_argument("--mqtt-username", type=str)
  177. password_argument_group = argparser.add_mutually_exclusive_group()
  178. password_argument_group.add_argument("--mqtt-password", type=str)
  179. password_argument_group.add_argument(
  180. "--mqtt-password-file",
  181. type=pathlib.Path,
  182. metavar="PATH",
  183. dest="mqtt_password_path",
  184. help="stripping trailing newline",
  185. )
  186. args = argparser.parse_args()
  187. if args.mqtt_password_path:
  188. # .read_text() replaces \r\n with \n
  189. mqtt_password = args.mqtt_password_path.read_bytes().decode()
  190. if mqtt_password.endswith("\r\n"):
  191. mqtt_password = mqtt_password[:-2]
  192. elif mqtt_password.endswith("\n"):
  193. mqtt_password = mqtt_password[:-1]
  194. else:
  195. mqtt_password = args.mqtt_password
  196. _run(
  197. mqtt_host=args.mqtt_host,
  198. mqtt_port=args.mqtt_port,
  199. mqtt_username=args.mqtt_username,
  200. mqtt_password=mqtt_password,
  201. )