__init__.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. # switchbot-mqtt - MQTT client controlling SwitchBot button & curtain automators,
  2. # compatible with home-assistant.io's MQTT Switch & Cover platform
  3. #
  4. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. import abc
  19. import argparse
  20. import collections
  21. import enum
  22. import json
  23. import logging
  24. import pathlib
  25. import re
  26. import typing
  27. import paho.mqtt.client
  28. import switchbot
  29. _LOGGER = logging.getLogger(__name__)
  30. _MAC_ADDRESS_REGEX = re.compile(r"^[0-9a-f]{2}(:[0-9a-f]{2}){5}$")
  31. class _MQTTTopicPlaceholder(enum.Enum):
  32. MAC_ADDRESS = "MAC_ADDRESS"
  33. _MQTTTopicLevel = typing.Union[str, _MQTTTopicPlaceholder]
  34. # "homeassistant" for historic reason, may be parametrized in future
  35. _MQTT_TOPIC_LEVELS_PREFIX = ["homeassistant"] # type: typing.List[_MQTTTopicLevel]
  36. def _mac_address_valid(mac_address: str) -> bool:
  37. return _MAC_ADDRESS_REGEX.match(mac_address.lower()) is not None
  38. class _MQTTCallbackUserdata:
  39. # pylint: disable=too-few-public-methods; @dataclasses.dataclass when python_requires>=3.7
  40. def __init__(
  41. self, retry_count: int, device_passwords: typing.Dict[str, str]
  42. ) -> None:
  43. self.retry_count = retry_count
  44. self.device_passwords = device_passwords
  45. def __eq__(self, other: object) -> bool:
  46. return isinstance(other, type(self)) and vars(self) == vars(other)
  47. class _MQTTControlledActor(abc.ABC):
  48. MQTT_COMMAND_TOPIC_LEVELS = NotImplemented # type: typing.List[_MQTTTopicLevel]
  49. MQTT_STATE_TOPIC_LEVELS = NotImplemented # type: typing.List[_MQTTTopicLevel]
  50. @abc.abstractmethod
  51. def __init__(
  52. self, mac_address: str, retry_count: int, password: typing.Optional[str]
  53. ) -> None:
  54. self._mac_address = mac_address
  55. @abc.abstractmethod
  56. def execute_command(
  57. self, mqtt_message_payload: bytes, mqtt_client: paho.mqtt.client.Client
  58. ) -> None:
  59. raise NotImplementedError()
  60. @classmethod
  61. def _mqtt_command_callback(
  62. cls,
  63. mqtt_client: paho.mqtt.client.Client,
  64. userdata: _MQTTCallbackUserdata,
  65. message: paho.mqtt.client.MQTTMessage,
  66. ) -> None:
  67. # pylint: disable=unused-argument; callback
  68. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
  69. _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
  70. if message.retain:
  71. _LOGGER.info("ignoring retained message")
  72. return
  73. topic_split = message.topic.split("/")
  74. if len(topic_split) != len(cls.MQTT_COMMAND_TOPIC_LEVELS):
  75. _LOGGER.warning("unexpected topic %s", message.topic)
  76. return
  77. mac_address = None
  78. for given_part, expected_part in zip(
  79. topic_split, cls.MQTT_COMMAND_TOPIC_LEVELS
  80. ):
  81. if expected_part == _MQTTTopicPlaceholder.MAC_ADDRESS:
  82. mac_address = given_part
  83. elif expected_part != given_part:
  84. _LOGGER.warning("unexpected topic %s", message.topic)
  85. return
  86. assert mac_address
  87. if not _mac_address_valid(mac_address):
  88. _LOGGER.warning("invalid mac address %s", mac_address)
  89. return
  90. actor = cls(
  91. mac_address=mac_address,
  92. retry_count=userdata.retry_count,
  93. password=userdata.device_passwords.get(mac_address, None),
  94. )
  95. actor.execute_command(
  96. mqtt_message_payload=message.payload, mqtt_client=mqtt_client
  97. )
  98. @classmethod
  99. def mqtt_subscribe(cls, mqtt_client: paho.mqtt.client.Client) -> None:
  100. command_topic = "/".join(
  101. "+" if isinstance(l, _MQTTTopicPlaceholder) else l
  102. for l in cls.MQTT_COMMAND_TOPIC_LEVELS
  103. )
  104. _LOGGER.info("subscribing to MQTT topic %r", command_topic)
  105. mqtt_client.subscribe(command_topic)
  106. mqtt_client.message_callback_add(
  107. sub=command_topic,
  108. callback=cls._mqtt_command_callback,
  109. )
  110. def _mqtt_publish(
  111. self,
  112. topic_levels: typing.List[_MQTTTopicLevel],
  113. payload: bytes,
  114. mqtt_client: paho.mqtt.client.Client,
  115. ) -> None:
  116. topic = "/".join(
  117. self._mac_address
  118. if l == _MQTTTopicPlaceholder.MAC_ADDRESS
  119. else typing.cast(str, l)
  120. for l in topic_levels
  121. )
  122. # https://pypi.org/project/paho-mqtt/#publishing
  123. _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)
  124. message_info = mqtt_client.publish(
  125. topic=topic, payload=payload, retain=True
  126. ) # type: paho.mqtt.client.MQTTMessageInfo
  127. # wait before checking status?
  128. if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
  129. _LOGGER.error(
  130. "Failed to publish MQTT message on topic %s (rc=%d)",
  131. topic,
  132. message_info.rc,
  133. )
  134. def report_state(self, state: bytes, mqtt_client: paho.mqtt.client.Client) -> None:
  135. self._mqtt_publish(
  136. topic_levels=self.MQTT_STATE_TOPIC_LEVELS,
  137. payload=state,
  138. mqtt_client=mqtt_client,
  139. )
  140. class _ButtonAutomator(_MQTTControlledActor):
  141. # https://www.home-assistant.io/integrations/switch.mqtt/
  142. MQTT_COMMAND_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [
  143. "switch",
  144. "switchbot",
  145. _MQTTTopicPlaceholder.MAC_ADDRESS,
  146. "set",
  147. ]
  148. MQTT_STATE_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [
  149. "switch",
  150. "switchbot",
  151. _MQTTTopicPlaceholder.MAC_ADDRESS,
  152. "state",
  153. ]
  154. def __init__(
  155. self, mac_address: str, retry_count: int, password: typing.Optional[None]
  156. ) -> None:
  157. self._device = switchbot.Switchbot(
  158. mac=mac_address, password=password, retry_count=retry_count
  159. )
  160. super().__init__(
  161. mac_address=mac_address, retry_count=retry_count, password=password
  162. )
  163. def execute_command(
  164. self, mqtt_message_payload: bytes, mqtt_client: paho.mqtt.client.Client
  165. ) -> None:
  166. # https://www.home-assistant.io/integrations/switch.mqtt/#payload_on
  167. if mqtt_message_payload.lower() == b"on":
  168. if not self._device.turn_on():
  169. _LOGGER.error("failed to turn on switchbot %s", self._mac_address)
  170. else:
  171. _LOGGER.info("switchbot %s turned on", self._mac_address)
  172. # https://www.home-assistant.io/integrations/switch.mqtt/#state_on
  173. self.report_state(mqtt_client=mqtt_client, state=b"ON")
  174. # https://www.home-assistant.io/integrations/switch.mqtt/#payload_off
  175. elif mqtt_message_payload.lower() == b"off":
  176. if not self._device.turn_off():
  177. _LOGGER.error("failed to turn off switchbot %s", self._mac_address)
  178. else:
  179. _LOGGER.info("switchbot %s turned off", self._mac_address)
  180. self.report_state(mqtt_client=mqtt_client, state=b"OFF")
  181. else:
  182. _LOGGER.warning(
  183. "unexpected payload %r (expected 'ON' or 'OFF')", mqtt_message_payload
  184. )
  185. class _CurtainMotor(_MQTTControlledActor):
  186. # https://www.home-assistant.io/integrations/cover.mqtt/
  187. MQTT_COMMAND_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [
  188. "cover",
  189. "switchbot-curtain",
  190. _MQTTTopicPlaceholder.MAC_ADDRESS,
  191. "set",
  192. ]
  193. MQTT_STATE_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [
  194. "cover",
  195. "switchbot-curtain",
  196. _MQTTTopicPlaceholder.MAC_ADDRESS,
  197. "state",
  198. ]
  199. def __init__(
  200. self, mac_address: str, retry_count: int, password: typing.Optional[None]
  201. ) -> None:
  202. self._device = switchbot.SwitchbotCurtain(
  203. mac=mac_address, password=password, retry_count=retry_count
  204. )
  205. super().__init__(
  206. mac_address=mac_address, retry_count=retry_count, password=password
  207. )
  208. def execute_command(
  209. self, mqtt_message_payload: bytes, mqtt_client: paho.mqtt.client.Client
  210. ) -> None:
  211. # https://www.home-assistant.io/integrations/cover.mqtt/#payload_open
  212. if mqtt_message_payload.lower() == b"open":
  213. if not self._device.open():
  214. _LOGGER.error("failed to open switchbot curtain %s", self._mac_address)
  215. else:
  216. _LOGGER.info("switchbot curtain %s opening", self._mac_address)
  217. # > state_opening string (Optional, default: opening)
  218. # https://www.home-assistant.io/integrations/cover.mqtt/#state_opening
  219. self.report_state(mqtt_client=mqtt_client, state=b"opening")
  220. elif mqtt_message_payload.lower() == b"close":
  221. if not self._device.close():
  222. _LOGGER.error("failed to close switchbot curtain %s", self._mac_address)
  223. else:
  224. _LOGGER.info("switchbot curtain %s closing", self._mac_address)
  225. # https://www.home-assistant.io/integrations/cover.mqtt/#state_closing
  226. self.report_state(mqtt_client=mqtt_client, state=b"closing")
  227. elif mqtt_message_payload.lower() == b"stop":
  228. if not self._device.stop():
  229. _LOGGER.error("failed to stop switchbot curtain %s", self._mac_address)
  230. else:
  231. _LOGGER.info("switchbot curtain %s stopped", self._mac_address)
  232. # no "stopped" state mentioned at
  233. # https://www.home-assistant.io/integrations/cover.mqtt/#configuration-variables
  234. # https://community.home-assistant.io/t/mqtt-how-to-remove-retained-messages/79029/2
  235. self.report_state(mqtt_client=mqtt_client, state=b"")
  236. else:
  237. _LOGGER.warning(
  238. "unexpected payload %r (expected 'OPEN', 'CLOSE', or 'STOP')",
  239. mqtt_message_payload,
  240. )
  241. def _mqtt_on_connect(
  242. mqtt_client: paho.mqtt.client.Client,
  243. userdata: _MQTTCallbackUserdata,
  244. flags: typing.Dict,
  245. return_code: int,
  246. ) -> None:
  247. # pylint: disable=unused-argument; callback
  248. # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
  249. assert return_code == 0, return_code # connection accepted
  250. mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
  251. _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
  252. _ButtonAutomator.mqtt_subscribe(mqtt_client=mqtt_client)
  253. _CurtainMotor.mqtt_subscribe(mqtt_client=mqtt_client)
  254. def _run(
  255. mqtt_host: str,
  256. mqtt_port: int,
  257. mqtt_username: typing.Optional[str],
  258. mqtt_password: typing.Optional[str],
  259. retry_count: int,
  260. device_passwords: typing.Dict[str, str],
  261. ) -> None:
  262. # https://pypi.org/project/paho-mqtt/
  263. mqtt_client = paho.mqtt.client.Client(
  264. userdata=_MQTTCallbackUserdata(
  265. retry_count=retry_count, device_passwords=device_passwords
  266. )
  267. )
  268. mqtt_client.on_connect = _mqtt_on_connect
  269. _LOGGER.info("connecting to MQTT broker %s:%d", mqtt_host, mqtt_port)
  270. if mqtt_username:
  271. mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
  272. elif mqtt_password:
  273. raise ValueError("Missing MQTT username")
  274. mqtt_client.connect(host=mqtt_host, port=mqtt_port)
  275. # https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/client.py#L1740
  276. mqtt_client.loop_forever()
  277. def _main() -> None:
  278. logging.basicConfig(
  279. level=logging.DEBUG,
  280. format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
  281. datefmt="%Y-%m-%dT%H:%M:%S%z",
  282. )
  283. argparser = argparse.ArgumentParser(
  284. description="MQTT client controlling SwitchBot button automators, "
  285. "compatible with home-assistant.io's MQTT Switch platform"
  286. )
  287. argparser.add_argument("--mqtt-host", type=str, required=True)
  288. argparser.add_argument("--mqtt-port", type=int, default=1883)
  289. argparser.add_argument("--mqtt-username", type=str)
  290. password_argument_group = argparser.add_mutually_exclusive_group()
  291. password_argument_group.add_argument("--mqtt-password", type=str)
  292. password_argument_group.add_argument(
  293. "--mqtt-password-file",
  294. type=pathlib.Path,
  295. metavar="PATH",
  296. dest="mqtt_password_path",
  297. help="stripping trailing newline",
  298. )
  299. argparser.add_argument(
  300. "--device-password-file",
  301. type=pathlib.Path,
  302. metavar="PATH",
  303. dest="device_password_path",
  304. help="path to json file mapping mac addresses of switchbot devices to passwords, e.g. "
  305. + json.dumps({"11:22:33:44:55:66": "password", "aa:bb:cc:dd:ee:ff": "secret"}),
  306. )
  307. argparser.add_argument(
  308. "--retries",
  309. dest="retry_count",
  310. type=int,
  311. default=switchbot.DEFAULT_RETRY_COUNT,
  312. help="Maximum number of attempts to send a command to a SwitchBot device"
  313. " (default: %(default)d)",
  314. )
  315. args = argparser.parse_args()
  316. if args.mqtt_password_path:
  317. # .read_text() replaces \r\n with \n
  318. mqtt_password = args.mqtt_password_path.read_bytes().decode()
  319. if mqtt_password.endswith("\r\n"):
  320. mqtt_password = mqtt_password[:-2]
  321. elif mqtt_password.endswith("\n"):
  322. mqtt_password = mqtt_password[:-1]
  323. else:
  324. mqtt_password = args.mqtt_password
  325. if args.device_password_path:
  326. device_passwords = json.loads(args.device_password_path.read_text())
  327. else:
  328. device_passwords = {}
  329. _run(
  330. mqtt_host=args.mqtt_host,
  331. mqtt_port=args.mqtt_port,
  332. mqtt_username=args.mqtt_username,
  333. mqtt_password=mqtt_password,
  334. retry_count=args.retry_count,
  335. device_passwords=device_passwords,
  336. )