__init__.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # switchbot-mqtt - MQTT client controlling SwitchBot button automators,
  2. # compatible with home-assistant.io's MQTT Switch platform
  3. #
  4. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. import argparse
  19. import enum
  20. import logging
  21. import re
  22. import typing
  23. import paho.mqtt.client
  24. import switchbot
  25. _LOGGER = logging.getLogger(__name__)
  26. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER = "{mac_address}"
  27. _MQTT_SET_TOPIC_PATTERN = [
  28. "homeassistant",
  29. "switch",
  30. "switchbot",
  31. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER,
  32. "set",
  33. ]
  34. _MQTT_SET_TOPIC = "/".join(_MQTT_SET_TOPIC_PATTERN).replace(
  35. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER, "+"
  36. )
  37. _MAC_ADDRESS_REGEX = re.compile(r"^[0-9a-f]{2}(:[0-9a-f]{2}){5}$")
  38. class _SwitchbotAction(enum.Enum):
  39. ON = 1
  40. OFF = 2
  41. def _mac_address_valid(mac_address: str) -> bool:
  42. return _MAC_ADDRESS_REGEX.match(mac_address.lower()) is not None
  43. def _mqtt_on_connect(
  44. mqtt_client: paho.mqtt.client.Client,
  45. user_data: typing.Any,
  46. flags: typing.Dict,
  47. return_code: int,
  48. ) -> None:
  49. # pylint: disable=unused-argument; callback
  50. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
  51. assert return_code == 0, return_code # connection accepted
  52. mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
  53. _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
  54. # https://www.home-assistant.io/docs/mqtt/discovery/#discovery_prefix
  55. mqtt_client.subscribe(_MQTT_SET_TOPIC)
  56. def _send_command(switchbot_mac_address: str, action: _SwitchbotAction) -> None:
  57. switchbot_device = switchbot.Switchbot(mac=switchbot_mac_address)
  58. if action == _SwitchbotAction.ON:
  59. if not switchbot_device.turn_on():
  60. _LOGGER.error("failed to turn on switchbot %s", switchbot_mac_address)
  61. else:
  62. _LOGGER.info("switchbot %s turned on", switchbot_mac_address)
  63. else:
  64. assert action == _SwitchbotAction.OFF, action
  65. if not switchbot_device.turn_off():
  66. _LOGGER.error("failed to turn off switchbot %s", switchbot_mac_address)
  67. else:
  68. _LOGGER.info("switchbot %s turned off", switchbot_mac_address)
  69. def _mqtt_on_message(
  70. mqtt_client: paho.mqtt.client.Client,
  71. user_data: typing.Any,
  72. message: paho.mqtt.client.MQTTMessage,
  73. ) -> None:
  74. # pylint: disable=unused-argument; callback
  75. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  76. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  77. if message.retain:
  78. _LOGGER.info("ignoring retained message")
  79. return
  80. topic_split = message.topic.split("/")
  81. if len(topic_split) != len(_MQTT_SET_TOPIC_PATTERN):
  82. _LOGGER.warning("unexpected topic %s", message.topic)
  83. return
  84. switchbot_mac_address = None
  85. for given_part, expected_part in zip(topic_split, _MQTT_SET_TOPIC_PATTERN):
  86. if expected_part == _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER:
  87. switchbot_mac_address = given_part
  88. elif expected_part != given_part:
  89. _LOGGER.warning("unexpected topic %s", message.topic)
  90. return
  91. assert switchbot_mac_address
  92. if not _mac_address_valid(switchbot_mac_address):
  93. _LOGGER.warning("invalid mac address %s", switchbot_mac_address)
  94. return
  95. # https://www.home-assistant.io/integrations/switch.mqtt/#payload_off
  96. if message.payload.lower() == b"on":
  97. action = _SwitchbotAction.ON
  98. elif message.payload.lower() == b"off":
  99. action = _SwitchbotAction.OFF
  100. else:
  101. _LOGGER.warning("unexpected payload %r", message.payload)
  102. return
  103. _send_command(switchbot_mac_address=switchbot_mac_address, action=action)
  104. def _run(
  105. mqtt_host: str,
  106. mqtt_port: int,
  107. mqtt_username: typing.Optional[str],
  108. mqtt_password: typing.Optional[str],
  109. ) -> None:
  110. # https://pypi.org/project/paho-mqtt/
  111. mqtt_client = paho.mqtt.client.Client()
  112. mqtt_client.on_connect = _mqtt_on_connect
  113. mqtt_client.on_message = _mqtt_on_message
  114. _LOGGER.info(
  115. "connecting to MQTT broker %s:%d", mqtt_host, mqtt_port,
  116. )
  117. if mqtt_username:
  118. mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
  119. elif mqtt_password:
  120. raise ValueError("Missing MQTT username")
  121. mqtt_client.connect(host=mqtt_host, port=mqtt_port)
  122. mqtt_client.loop_forever()
  123. def _main() -> None:
  124. logging.basicConfig(
  125. level=logging.DEBUG,
  126. format="%(asctime)s:%(levelname)s:%(message)s",
  127. datefmt="%Y-%m-%dT%H:%M:%S%z",
  128. )
  129. argparser = argparse.ArgumentParser(
  130. description="MQTT client controlling SwitchBot button automators, "
  131. "compatible with home-assistant.io's MQTT Switch platform"
  132. )
  133. argparser.add_argument("--mqtt-host", type=str, required=True)
  134. argparser.add_argument("--mqtt-port", type=int, default=1883)
  135. argparser.add_argument("--mqtt-username", type=str)
  136. argparser.add_argument("--mqtt-password", type=str)
  137. args = argparser.parse_args()
  138. _run(
  139. mqtt_host=args.mqtt_host,
  140. mqtt_port=args.mqtt_port,
  141. mqtt_username=args.mqtt_username,
  142. mqtt_password=args.mqtt_password,
  143. )