__init__.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # switchbot-mqtt - MQTT client controlling SwitchBot button automators,
  2. # compatible with home-assistant.io's MQTT Switch platform
  3. #
  4. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. import argparse
  19. import logging
  20. import typing
  21. import paho.mqtt.client
  22. _LOGGER = logging.getLogger(__name__)
  23. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER = "{mac_address}"
  24. _MQTT_SET_TOPIC_PATTERN = [
  25. "homeassistant",
  26. "switch",
  27. "switchbot",
  28. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER,
  29. "set",
  30. ] # TODO parametrize
  31. _MQTT_SET_TOPIC = "/".join(_MQTT_SET_TOPIC_PATTERN).replace(
  32. _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER, "+"
  33. )
  34. def _mqtt_on_connect(
  35. mqtt_client: paho.mqtt.client.Client,
  36. user_data: typing.Any,
  37. flags: typing.Dict,
  38. return_code: int,
  39. ) -> None:
  40. # pylint: disable=unused-argument; callback
  41. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
  42. assert return_code == 0, return_code # connection accepted
  43. mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
  44. _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
  45. # https://www.home-assistant.io/docs/mqtt/discovery/#discovery_prefix
  46. mqtt_client.subscribe(_MQTT_SET_TOPIC)
  47. def _mqtt_on_message(
  48. mqtt_client: paho.mqtt.client.Client,
  49. user_data: typing.Any,
  50. message: paho.mqtt.client.MQTTMessage,
  51. ) -> None:
  52. # pylint: disable=unused-argument; callback
  53. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  54. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  55. if message.retain:
  56. _LOGGER.info("ignoring retained message")
  57. return
  58. topic_split = message.topic.split("/")
  59. if len(topic_split) != len(_MQTT_SET_TOPIC_PATTERN):
  60. _LOGGER.warning("unexpected topic %s", message.topic)
  61. return
  62. switchbot_mac_address = None
  63. for given_part, expected_part in zip(topic_split, _MQTT_SET_TOPIC_PATTERN):
  64. if expected_part == _MQTT_TOPIC_MAC_ADDRESS_PLACEHOLDER:
  65. switchbot_mac_address = given_part
  66. elif expected_part != given_part:
  67. _LOGGER.warning("unexpected topic %s", message.topic)
  68. return
  69. assert switchbot_mac_address
  70. print("TODO", switchbot_mac_address, message.payload)
  71. def _main() -> None:
  72. logging.basicConfig(
  73. level=logging.DEBUG,
  74. format="%(asctime)s:%(levelname)s:%(message)s",
  75. datefmt="%Y-%m-%dT%H:%M:%S%z",
  76. )
  77. argparser = argparse.ArgumentParser(
  78. "MQTT client controlling SwitchBot button automators, "
  79. "compatible with home-assistant.io's MQTT Switch platform"
  80. )
  81. argparser.add_argument("--mqtt-host", type=str, required=True)
  82. argparser.add_argument("--mqtt-port", type=int, default=1883)
  83. args = argparser.parse_args()
  84. # https://pypi.org/project/paho-mqtt/
  85. mqtt_client = paho.mqtt.client.Client()
  86. mqtt_client.on_connect = _mqtt_on_connect
  87. mqtt_client.on_message = _mqtt_on_message
  88. _LOGGER.info(
  89. "connecting to MQTT broker %s:%d", args.mqtt_host, args.mqtt_port,
  90. )
  91. mqtt_client.connect(host=args.mqtt_host, port=args.mqtt_port)
  92. mqtt_client.loop_forever()