__init__.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import binascii
  4. import logging
  5. import threading
  6. import time
  7. from typing import Any
  8. import bluepy
  9. DEFAULT_RETRY_COUNT = 3
  10. DEFAULT_RETRY_TIMEOUT = 1
  11. DEFAULT_SCAN_TIMEOUT = 5
  12. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  13. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  14. NOTIFICATION_HANDLE = "cba20003-224d-11e6-9fb8-0002a5d5c51b"
  15. KEY_PASSWORD_PREFIX = "5711"
  16. KEY_PASSWORD_NOTIFY_PREFIX = "5712"
  17. PRESS_KEY = "570100"
  18. ON_KEY = "570101"
  19. OFF_KEY = "570102"
  20. OPEN_KEY = "570f450105ff00" # 570F4501010100
  21. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  22. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  23. STOP_KEY = "570F450100ff"
  24. DEVICE_BASIC_SETTINGS_KEY = "5702"
  25. ON_KEY_SUFFIX = "01"
  26. OFF_KEY_SUFFIX = "02"
  27. PRESS_KEY_SUFFIX = "00"
  28. _LOGGER = logging.getLogger(__name__)
  29. CONNECT_LOCK = threading.Lock()
  30. def _process_wohand(data: bytes) -> dict[str, bool | int]:
  31. """Process woHand/Bot services data."""
  32. _bot_data: dict[str, bool | int] = {}
  33. # 128 switch or 0 press.
  34. _bot_data["switchMode"] = bool(data[1] & 0b10000000)
  35. # 64 off or 0 for on, if not inversed in app.
  36. if _bot_data["switchMode"]:
  37. _bot_data["isOn"] = not bool(data[1] & 0b01000000)
  38. else:
  39. _bot_data["isOn"] = False
  40. _bot_data["battery"] = data[2] & 0b01111111
  41. return _bot_data
  42. def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
  43. """Process woCurtain/Curtain services data."""
  44. _curtain_data: dict[str, bool | int] = {}
  45. _curtain_data["calibration"] = bool(data[1] & 0b01000000)
  46. _curtain_data["battery"] = data[2] & 0b01111111
  47. _curtain_data["inMotion"] = bool(data[3] & 0b10000000)
  48. _position = max(min(data[3] & 0b01111111, 100), 0)
  49. _curtain_data["position"] = (100 - _position) if reverse else _position
  50. # light sensor level (1-10)
  51. _curtain_data["lightLevel"] = (data[4] >> 4) & 0b00001111
  52. _curtain_data["deviceChain"] = data[4] & 0b00000111
  53. return _curtain_data
  54. def _process_wosensorth(data: bytes) -> dict[str, Any]:
  55. """Process woSensorTH/Temp sensor services data."""
  56. _wosensorth_data: dict[str, Any] = {}
  57. _temp_sign = 1 if data[4] & 0b10000000 else -1
  58. _temp_c = _temp_sign * ((data[4] & 0b01111111) + (data[3] / 10))
  59. _temp_f = (_temp_c * 9 / 5) + 32
  60. _temp_f = (_temp_f * 10) / 10
  61. _wosensorth_data["temp"] = {}
  62. _wosensorth_data["temp"]["c"] = _temp_c
  63. _wosensorth_data["temp"]["f"] = _temp_f
  64. _wosensorth_data["fahrenheit"] = bool(data[5] & 0b10000000)
  65. _wosensorth_data["humidity"] = data[5] & 0b01111111
  66. _wosensorth_data["battery"] = data[2] & 0b01111111
  67. return _wosensorth_data
  68. class GetSwitchbotDevices:
  69. """Scan for all Switchbot devices and return by type."""
  70. def __init__(self, interface: int | None = None) -> None:
  71. """Get switchbot devices class constructor."""
  72. self._interface = interface
  73. self._all_services_data: dict[str, Any] = {}
  74. def discover(
  75. self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
  76. ) -> dict | None:
  77. """Find switchbot devices and their advertisement data."""
  78. devices = None
  79. try:
  80. devices = bluepy.btle.Scanner(self._interface).scan(scan_timeout)
  81. except bluepy.btle.BTLEManagementError:
  82. _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
  83. if devices is None:
  84. if retry < 1:
  85. _LOGGER.error(
  86. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  87. )
  88. return None
  89. _LOGGER.warning(
  90. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  91. retry,
  92. )
  93. time.sleep(DEFAULT_RETRY_TIMEOUT)
  94. return self.discover(retry - 1, scan_timeout)
  95. for dev in devices:
  96. if dev.getValueText(7) == UUID:
  97. dev_id = dev.addr.replace(":", "")
  98. self._all_services_data[dev_id] = {}
  99. self._all_services_data[dev_id]["mac_address"] = dev.addr
  100. for (adtype, desc, value) in dev.getScanData():
  101. if adtype == 22:
  102. _data = bytes.fromhex(value[4:])
  103. _model = chr(_data[0] & 0b01111111)
  104. if _model == "H":
  105. self._all_services_data[dev_id]["data"] = _process_wohand(
  106. _data
  107. )
  108. self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
  109. self._all_services_data[dev_id]["isEncrypted"] = bool(
  110. _data[0] & 0b10000000
  111. )
  112. self._all_services_data[dev_id]["model"] = _model
  113. self._all_services_data[dev_id]["modelName"] = "WoHand"
  114. elif _model == "c":
  115. self._all_services_data[dev_id][
  116. "data"
  117. ] = _process_wocurtain(_data)
  118. self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
  119. self._all_services_data[dev_id]["isEncrypted"] = bool(
  120. _data[0] & 0b10000000
  121. )
  122. self._all_services_data[dev_id]["model"] = _model
  123. self._all_services_data[dev_id]["modelName"] = "WoCurtain"
  124. elif _model == "T":
  125. self._all_services_data[dev_id][
  126. "data"
  127. ] = _process_wosensorth(_data)
  128. self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
  129. self._all_services_data[dev_id]["isEncrypted"] = bool(
  130. _data[0] & 0b10000000
  131. )
  132. self._all_services_data[dev_id]["model"] = _model
  133. self._all_services_data[dev_id]["modelName"] = "WoSensorTH"
  134. else:
  135. continue
  136. else:
  137. self._all_services_data[dev_id][desc] = value
  138. return self._all_services_data
  139. def get_curtains(self) -> dict:
  140. """Return all WoCurtain/Curtains devices with services data."""
  141. if not self._all_services_data:
  142. self.discover()
  143. _curtain_devices = {}
  144. for device, data in self._all_services_data.items():
  145. if data.get("model") == "c":
  146. _curtain_devices[device] = data
  147. return _curtain_devices
  148. def get_bots(self) -> dict:
  149. """Return all WoHand/Bot devices with services data."""
  150. if not self._all_services_data:
  151. self.discover()
  152. _bot_devices = {}
  153. for device, data in self._all_services_data.items():
  154. if data.get("model") == "H":
  155. _bot_devices[device] = data
  156. return _bot_devices
  157. def get_tempsensors(self) -> dict:
  158. """Return all WoSensorTH/Temp sensor devices with services data."""
  159. if not self._all_services_data:
  160. self.discover()
  161. _bot_temp = {}
  162. for device, data in self._all_services_data.items():
  163. if data.get("model") == "T":
  164. _bot_temp[device] = data
  165. return _bot_temp
  166. def get_device_data(self, mac: str) -> dict:
  167. """Return data for specific device."""
  168. if not self._all_services_data:
  169. self.discover()
  170. _switchbot_data = {}
  171. for device in self._all_services_data.values():
  172. if device["mac_address"] == mac:
  173. _switchbot_data = device
  174. return _switchbot_data
  175. class SwitchbotDevice:
  176. """Base Representation of a Switchbot Device."""
  177. def __init__(
  178. self,
  179. mac: str,
  180. password: str | None = None,
  181. interface: int | None = None,
  182. **kwargs: Any,
  183. ) -> None:
  184. """Switchbot base class constructor."""
  185. self._interface = interface
  186. self._mac = mac
  187. self._device = bluepy.btle.Peripheral(
  188. deviceAddr=None, addrType=bluepy.btle.ADDR_TYPE_RANDOM, iface=interface
  189. )
  190. self._switchbot_device_data: dict[str, Any] = {}
  191. self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  192. self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  193. if password is None or password == "":
  194. self._password_encoded = None
  195. else:
  196. self._password_encoded = "%x" % (
  197. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  198. )
  199. def _connect(self) -> None:
  200. try:
  201. _LOGGER.debug("Connecting to Switchbot")
  202. self._device.connect(
  203. self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
  204. )
  205. _LOGGER.debug("Connected to Switchbot")
  206. except bluepy.btle.BTLEException:
  207. _LOGGER.debug("Failed connecting to Switchbot", exc_info=True)
  208. raise
  209. def _disconnect(self) -> None:
  210. _LOGGER.debug("Disconnecting")
  211. try:
  212. self._device.disconnect()
  213. except bluepy.btle.BTLEException:
  214. _LOGGER.warning("Error disconnecting from Switchbot", exc_info=True)
  215. def _commandkey(self, key: str) -> str:
  216. if self._password_encoded is None:
  217. return key
  218. key_suffix = PRESS_KEY_SUFFIX
  219. if key == ON_KEY:
  220. key_suffix = ON_KEY_SUFFIX
  221. elif key == OFF_KEY:
  222. key_suffix = OFF_KEY_SUFFIX
  223. elif key == DEVICE_BASIC_SETTINGS_KEY:
  224. return KEY_PASSWORD_NOTIFY_PREFIX + self._password_encoded
  225. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  226. def _writekey(self, key: str) -> Any:
  227. _LOGGER.debug("Prepare to send")
  228. hand = self._device.getCharacteristics(uuid=HANDLE)[0]
  229. _LOGGER.debug("Sending command, %s", key)
  230. write_result = hand.write(binascii.a2b_hex(key), withResponse=False)
  231. if not write_result:
  232. _LOGGER.error(
  233. "Sent command but didn't get a response from Switchbot confirming command was sent."
  234. " Please check the Switchbot"
  235. )
  236. else:
  237. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
  238. return write_result
  239. def _subscribe(self, key: str) -> Any:
  240. _LOGGER.debug("Subscribe to notifications")
  241. handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)[0]
  242. notify_handle = handle.getHandle() + 1
  243. response = self._device.writeCharacteristic(
  244. notify_handle, binascii.a2b_hex(key), withResponse=False
  245. )
  246. return response
  247. def _readkey(self) -> bytes | None:
  248. _LOGGER.debug("Prepare to read")
  249. receive_handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)
  250. if receive_handle:
  251. for char in receive_handle:
  252. read_result: bytes = char.read()
  253. return read_result
  254. return None
  255. def _sendcommand(self, key: str, retry: int) -> bool:
  256. send_success = False
  257. command = self._commandkey(key)
  258. _LOGGER.debug("Sending command to switchbot %s", command)
  259. with CONNECT_LOCK:
  260. try:
  261. self._connect()
  262. send_success = self._writekey(command)
  263. except bluepy.btle.BTLEException:
  264. _LOGGER.warning("Error talking to Switchbot", exc_info=True)
  265. finally:
  266. self._disconnect()
  267. if send_success:
  268. return True
  269. if retry < 1:
  270. _LOGGER.error(
  271. "Switchbot communication failed. Stopping trying", exc_info=True
  272. )
  273. return False
  274. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
  275. time.sleep(DEFAULT_RETRY_TIMEOUT)
  276. return self._sendcommand(key, retry - 1)
  277. def get_mac(self) -> str:
  278. """Return mac address of device."""
  279. return self._mac
  280. def get_battery_percent(self) -> Any:
  281. """Return device battery level in percent."""
  282. if not self._switchbot_device_data:
  283. return None
  284. return self._switchbot_device_data["data"]["battery"]
  285. def get_device_data(
  286. self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
  287. ) -> dict | None:
  288. """Find switchbot devices and their advertisement data."""
  289. if interface:
  290. _interface: int | None = interface
  291. else:
  292. _interface = self._interface
  293. devices = None
  294. try:
  295. devices = bluepy.btle.Scanner(_interface).scan(self._scan_timeout)
  296. except bluepy.btle.BTLEManagementError:
  297. _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
  298. if devices is None:
  299. if retry < 1:
  300. _LOGGER.error(
  301. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  302. )
  303. return None
  304. _LOGGER.warning(
  305. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  306. retry,
  307. )
  308. time.sleep(DEFAULT_RETRY_TIMEOUT)
  309. return self.get_device_data(retry=retry - 1, interface=_interface)
  310. for dev in devices:
  311. if self._mac.lower() == dev.addr.lower():
  312. self._switchbot_device_data["mac_address"] = dev.addr
  313. for (adtype, desc, value) in dev.getScanData():
  314. if adtype == 22:
  315. _data = bytes.fromhex(value[4:])
  316. _model = chr(_data[0] & 0b01111111)
  317. if _model == "H":
  318. self._switchbot_device_data["data"] = _process_wohand(_data)
  319. self._switchbot_device_data["data"]["rssi"] = dev.rssi
  320. self._switchbot_device_data["isEncrypted"] = bool(
  321. _data[0] & 0b10000000
  322. )
  323. self._switchbot_device_data["model"] = _model
  324. self._switchbot_device_data["modelName"] = "WoHand"
  325. elif _model == "c":
  326. self._switchbot_device_data["data"] = _process_wocurtain(
  327. _data
  328. )
  329. self._switchbot_device_data["data"]["rssi"] = dev.rssi
  330. self._switchbot_device_data["isEncrypted"] = bool(
  331. _data[0] & 0b10000000
  332. )
  333. self._switchbot_device_data["model"] = _model
  334. self._switchbot_device_data["modelName"] = "WoCurtain"
  335. elif _model == "T":
  336. self._switchbot_device_data["data"] = _process_wosensorth(
  337. _data
  338. )
  339. self._switchbot_device_data["data"]["rssi"] = dev.rssi
  340. self._switchbot_device_data["isEncrypted"] = bool(
  341. _data[0] & 0b10000000
  342. )
  343. self._switchbot_device_data["model"] = _model
  344. self._switchbot_device_data["modelName"] = "WoSensorTH"
  345. else:
  346. continue
  347. else:
  348. self._switchbot_device_data[desc] = value
  349. return self._switchbot_device_data
  350. def _get_basic_info(self, retry: int = DEFAULT_RETRY_COUNT) -> bytes:
  351. """Get device basic settings."""
  352. send_success = False
  353. command = self._commandkey(DEVICE_BASIC_SETTINGS_KEY)
  354. try:
  355. self._connect()
  356. self._subscribe(command)
  357. send_success = self._writekey(command)
  358. value = self._readkey()
  359. except bluepy.btle.BTLEException:
  360. _LOGGER.warning("Error talking to Switchbot", exc_info=True)
  361. finally:
  362. self._disconnect()
  363. if send_success and value:
  364. print("Successfully retrieved data from device " + str(self._mac))
  365. return value
  366. if retry < 1:
  367. _LOGGER.error(
  368. "Switchbot communication failed. Stopping trying", exc_info=True
  369. )
  370. return bytes(0)
  371. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
  372. time.sleep(DEFAULT_RETRY_TIMEOUT)
  373. return self._get_basic_info(retry - 1)
  374. class Switchbot(SwitchbotDevice):
  375. """Representation of a Switchbot."""
  376. def __init__(self, *args: Any, **kwargs: Any) -> None:
  377. """Switchbot Bot/WoHand constructor."""
  378. super().__init__(*args, **kwargs)
  379. self._inverse: bool = kwargs.pop("inverse_mode", False)
  380. self._settings: dict[str, Any] = {}
  381. def update(self, interface: int | None = None) -> None:
  382. """Update mode, battery percent and state of device."""
  383. self.get_device_data(retry=self._retry_count, interface=interface)
  384. def turn_on(self) -> bool:
  385. """Turn device on."""
  386. return self._sendcommand(ON_KEY, self._retry_count)
  387. def turn_off(self) -> bool:
  388. """Turn device off."""
  389. return self._sendcommand(OFF_KEY, self._retry_count)
  390. def press(self) -> bool:
  391. """Press command to device."""
  392. return self._sendcommand(PRESS_KEY, self._retry_count)
  393. def get_basic_info(self) -> dict[str, Any]:
  394. """Get device basic settings."""
  395. settings_data = self._get_basic_info()
  396. self._settings["battery"] = settings_data[1]
  397. self._settings["firmware"] = settings_data[2] / 10.0
  398. self._settings["timers"] = settings_data[8]
  399. self._settings["dualStateMode"] = bool(settings_data[9] & 16)
  400. self._settings["inverseDirection"] = bool(settings_data[9] & 1)
  401. self._settings["holdSeconds"] = settings_data[10]
  402. return self._settings
  403. def switch_mode(self) -> Any:
  404. """Return true or false from cache."""
  405. # To get actual position call update() first.
  406. if not self._switchbot_device_data:
  407. return None
  408. return self._switchbot_device_data["data"]["switchMode"]
  409. def is_on(self) -> Any:
  410. """Return switch state from cache."""
  411. # To get actual position call update() first.
  412. if not self._switchbot_device_data:
  413. return None
  414. if self._inverse:
  415. return not self._switchbot_device_data["data"]["isOn"]
  416. return self._switchbot_device_data["data"]["isOn"]
  417. class SwitchbotCurtain(SwitchbotDevice):
  418. """Representation of a Switchbot Curtain."""
  419. def __init__(self, *args: Any, **kwargs: Any) -> None:
  420. """Switchbot Curtain/WoCurtain constructor."""
  421. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  422. # This is independent of the calibration of the curtain bot (Open left to right/
  423. # Open right to left/Open from the middle).
  424. # The parameter 'reverse_mode' reverse these values,
  425. # if 'reverse_mode' = True, position = 0 equals close
  426. # and position = 100 equals open. The parameter is default set to True so that
  427. # the definition of position is the same as in Home Assistant.
  428. super().__init__(*args, **kwargs)
  429. self._reverse: bool = kwargs.pop("reverse_mode", True)
  430. self._settings: dict[str, Any] = {}
  431. def open(self) -> bool:
  432. """Send open command."""
  433. return self._sendcommand(OPEN_KEY, self._retry_count)
  434. def close(self) -> bool:
  435. """Send close command."""
  436. return self._sendcommand(CLOSE_KEY, self._retry_count)
  437. def stop(self) -> bool:
  438. """Send stop command to device."""
  439. return self._sendcommand(STOP_KEY, self._retry_count)
  440. def set_position(self, position: int) -> bool:
  441. """Send position command (0-100) to device."""
  442. position = (100 - position) if self._reverse else position
  443. hex_position = "%0.2X" % position
  444. return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  445. def update(self, interface: int | None = None) -> None:
  446. """Update position, battery percent and light level of device."""
  447. self.get_device_data(retry=self._retry_count, interface=interface)
  448. def get_position(self) -> Any:
  449. """Return cached position (0-100) of Curtain."""
  450. # To get actual position call update() first.
  451. if not self._switchbot_device_data:
  452. return None
  453. return self._switchbot_device_data["data"]["position"]
  454. def get_basic_info(self) -> dict[str, Any]:
  455. """Get device basic settings."""
  456. settings_data = self._get_basic_info()
  457. self._settings["battery"] = settings_data[1]
  458. self._settings["firmware"] = settings_data[2] / 10.0
  459. self._settings["chainLength"] = settings_data[3]
  460. self._settings["openDirection"] = (
  461. "right_to_left" if settings_data[4] & 0b10000000 == 128 else "left_to_right"
  462. )
  463. self._settings["touchToOpen"] = bool(settings_data[4] & 0b01000000)
  464. self._settings["light"] = bool(settings_data[4] & 0b00100000)
  465. self._settings["fault"] = bool(settings_data[4] & 0b00001000)
  466. self._settings["solarPanel"] = bool(settings_data[5] & 0b00001000)
  467. self._settings["calibrated"] = bool(settings_data[5] & 0b00000100)
  468. self._settings["inMotion"] = bool(settings_data[5] & 0b01000011)
  469. _position = max(min(settings_data[6], 100), 0)
  470. self._settings["position"] = (100 - _position) if self._reverse else _position
  471. self._settings["timers"] = settings_data[7]
  472. return self._settings
  473. def get_light_level(self) -> Any:
  474. """Return cached light level."""
  475. # To get actual light level call update() first.
  476. if not self._switchbot_device_data:
  477. return None
  478. return self._switchbot_device_data["data"]["lightLevel"]
  479. def is_reversed(self) -> bool:
  480. """Return True if the curtain open from left to right."""
  481. return self._reverse
  482. def is_calibrated(self) -> Any:
  483. """Return True curtain is calibrated."""
  484. # To get actual light level call update() first.
  485. if not self._switchbot_device_data:
  486. return None
  487. return self._switchbot_device_data["data"]["calibration"]