__init__.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import argparse
  2. import logging
  3. import pathlib
  4. import typing
  5. import paho.mqtt.client
  6. import intertechno_cc1101
  7. _LOGGER = logging.getLogger(__name__)
  8. def _mqtt_on_message(
  9. mqtt_client: paho.mqtt.client.Client,
  10. userdata: None,
  11. message: paho.mqtt.client.MQTTMessage,
  12. ):
  13. # pylint: disable=unused-argument; callback
  14. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  15. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  16. if message.retain:
  17. _LOGGER.warning("ignoring retained message")
  18. return
  19. topic_split = message.topic.split("/")
  20. try:
  21. address = int(topic_split[1])
  22. except ValueError:
  23. _LOGGER.warning(
  24. "failed to parse address %r, expected integer; ignoring message",
  25. topic_split[1],
  26. )
  27. return
  28. try:
  29. button_index = int(topic_split[2])
  30. except ValueError:
  31. _LOGGER.warning(
  32. "failed to parse button index %r, expected integer; ignoring message",
  33. topic_split[2],
  34. )
  35. return
  36. try:
  37. remote_control = intertechno_cc1101.RemoteControl(address=address)
  38. except AssertionError:
  39. _LOGGER.warning(
  40. "failed to initialize remote control, invalid address? ignoring message",
  41. exc_info=True,
  42. )
  43. return
  44. # https://www.home-assistant.io/integrations/switch.mqtt/#payload_on
  45. try:
  46. if message.payload.upper() == b"ON":
  47. remote_control.turn_on(button_index=button_index)
  48. elif message.payload.upper() == b"OFF":
  49. remote_control.turn_off(button_index=button_index)
  50. else:
  51. _LOGGER.warning(
  52. "unexpected payload %r; expected 'ON' or 'OFF'", message.payload
  53. )
  54. except Exception: # pylint: disable=broad-except; invalid perms? spi error? invalid button index?
  55. _LOGGER.error("failed to send signal", exc_info=True)
  56. def _mqtt_on_connect(
  57. mqtt_client: paho.mqtt.client.Client,
  58. user_data: typing.Any,
  59. flags: typing.Dict,
  60. return_code: int,
  61. ) -> None:
  62. # pylint: disable=unused-argument; callback
  63. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
  64. assert return_code == 0, return_code # connection accepted
  65. mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
  66. _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
  67. set_topic = "intertechno-cc1101/+/+/set"
  68. _LOGGER.info("subscribing to MQTT topic %r", set_topic)
  69. # alternative: .message_callback_add
  70. mqtt_client.on_message = _mqtt_on_message
  71. mqtt_client.subscribe(set_topic)
  72. def _run(
  73. mqtt_host: str,
  74. mqtt_port: int,
  75. mqtt_username: typing.Optional[str],
  76. mqtt_password: typing.Optional[str],
  77. ) -> None:
  78. # https://pypi.org/project/paho-mqtt/
  79. mqtt_client = paho.mqtt.client.Client()
  80. mqtt_client.on_connect = _mqtt_on_connect
  81. _LOGGER.info("connecting to MQTT broker %s:%d", mqtt_host, mqtt_port)
  82. if mqtt_username:
  83. mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
  84. elif mqtt_password:
  85. raise ValueError("Missing MQTT username")
  86. mqtt_client.connect(host=mqtt_host, port=mqtt_port)
  87. mqtt_client.loop_forever()
  88. def _main() -> None:
  89. logging.basicConfig(
  90. level=logging.DEBUG,
  91. format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
  92. datefmt="%Y-%m-%dT%H:%M:%S%z",
  93. )
  94. logging.getLogger("cc1101").setLevel(logging.INFO)
  95. argparser = argparse.ArgumentParser(
  96. description="MQTT client controlling Intertechno smart outlets via a CC1101 transceiver, "
  97. "compatible with home-assistant.io's MQTT Switch platform",
  98. allow_abbrev=False,
  99. )
  100. argparser.add_argument("--mqtt-host", type=str, required=True)
  101. argparser.add_argument("--mqtt-port", type=int, default=1883)
  102. argparser.add_argument("--mqtt-username", type=str)
  103. password_argument_group = argparser.add_mutually_exclusive_group()
  104. password_argument_group.add_argument("--mqtt-password", type=str)
  105. password_argument_group.add_argument(
  106. "--mqtt-password-file",
  107. type=pathlib.Path,
  108. metavar="PATH",
  109. dest="mqtt_password_path",
  110. help="stripping trailing newline",
  111. )
  112. args = argparser.parse_args()
  113. if args.mqtt_password_path:
  114. # .read_text() replaces \r\n with \n
  115. mqtt_password = args.mqtt_password_path.read_bytes().decode()
  116. if mqtt_password.endswith("\r\n"):
  117. mqtt_password = mqtt_password[:-2]
  118. elif mqtt_password.endswith("\n"):
  119. mqtt_password = mqtt_password[:-1]
  120. else:
  121. mqtt_password = args.mqtt_password
  122. _run(
  123. mqtt_host=args.mqtt_host,
  124. mqtt_port=args.mqtt_port,
  125. mqtt_username=args.mqtt_username,
  126. mqtt_password=mqtt_password,
  127. )