__init__.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. """Library to handle connection with Switchbot."""
  2. from __future__ import annotations
  3. import binascii
  4. import logging
  5. from multiprocessing import Manager, Process
  6. import time
  7. import bluepy
  8. DEFAULT_RETRY_COUNT = 3
  9. DEFAULT_RETRY_TIMEOUT = 1
  10. DEFAULT_SCAN_TIMEOUT = 5
  11. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  12. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  13. KEY_PASSWORD_PREFIX = "5711"
  14. PRESS_KEY = "570100"
  15. ON_KEY = "570101"
  16. OFF_KEY = "570102"
  17. OPEN_KEY = "570f450105ff00" # 570F4501010100
  18. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  19. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  20. STOP_KEY = "570F450100ff"
  21. ON_KEY_SUFFIX = "01"
  22. OFF_KEY_SUFFIX = "02"
  23. PRESS_KEY_SUFFIX = "00"
  24. _LOGGER = logging.getLogger(__name__)
  25. def _process_wohand(data) -> dict:
  26. """Process woHand/Bot services data."""
  27. _bot_data = {}
  28. _sensor_data = binascii.unhexlify(data.encode())
  29. # 128 switch or 0 press.
  30. _bot_data["switchMode"] = bool(_sensor_data[1] & 0b10000000)
  31. # 64 off or 0 for on, if not inversed in app.
  32. if _bot_data["switchMode"]:
  33. _bot_data["isOn"] = not bool(_sensor_data[1] & 0b01000000)
  34. else:
  35. _bot_data["isOn"] = False
  36. _bot_data["battery"] = _sensor_data[2] & 0b01111111
  37. return _bot_data
  38. def _process_wocurtain(data, reverse=True) -> dict:
  39. """Process woCurtain/Curtain services data."""
  40. _curtain_data = {}
  41. _sensor_data = binascii.unhexlify(data.encode())
  42. _curtain_data["calibration"] = bool(_sensor_data[1] & 0b01000000)
  43. _curtain_data["battery"] = _sensor_data[2] & 0b01111111
  44. _position = max(min(_sensor_data[3] & 0b01111111, 100), 0)
  45. _curtain_data["position"] = (100 - _position) if reverse else _position
  46. # light sensor level (1-10)
  47. _curtain_data["lightLevel"] = (_sensor_data[4] >> 4) & 0b00001111
  48. return _curtain_data
  49. def _process_wosensorth(data) -> dict:
  50. """Process woSensorTH/Temp sensor services data."""
  51. _wosensorth_data = {}
  52. _sensor_data = binascii.unhexlify(data.encode())
  53. _temp_sign = _sensor_data[4] & 0b10000000
  54. _temp_c = _temp_sign * ((_sensor_data[4] & 0b01111111) + (_sensor_data[3] / 10))
  55. _temp_f = (_temp_c * 9 / 5) + 32
  56. _temp_f = (_temp_f * 10) / 10
  57. _wosensorth_data["temp"]["c"] = _temp_c
  58. _wosensorth_data["temp"]["f"] = _temp_f
  59. _wosensorth_data["fahrenheit"] = bool(_sensor_data[5] & 0b10000000)
  60. _wosensorth_data["humidity"] = _sensor_data[5] & 0b01111111
  61. _wosensorth_data["battery"] = _sensor_data[2] & 0b01111111
  62. return _wosensorth_data
  63. class BLEScanner:
  64. """Helper for bluepy device scanning."""
  65. def __init__(self, scan_timeout=DEFAULT_SCAN_TIMEOUT, interface=None) -> None:
  66. """Init class constructor."""
  67. self._scan_timeout = scan_timeout
  68. self._interface = interface
  69. self._devices = None
  70. def start(self) -> dict | None:
  71. """Start scan in seperate process."""
  72. with Manager() as manager:
  73. _devices = manager.dict()
  74. process = Process(target=self._scan, args=(_devices,))
  75. process.start()
  76. process.join()
  77. return _devices.get(0, None)
  78. def _scan(self, devices) -> dict | None:
  79. """Scan for advertisement data."""
  80. try:
  81. devices[0] = bluepy.btle.Scanner(self._interface).scan(self._scan_timeout)
  82. except bluepy.btle.BTLEDisconnectError:
  83. pass
  84. class GetSwitchbotDevices:
  85. """Scan for all Switchbot devices and return by type."""
  86. def __init__(self, interface=None) -> None:
  87. """Get switchbot devices class constructor."""
  88. self._interface = interface
  89. self._all_services_data = {}
  90. def discover(
  91. self, retry=DEFAULT_RETRY_COUNT, scan_timeout=DEFAULT_SCAN_TIMEOUT
  92. ) -> dict | None:
  93. """Find switchbot devices and their advertisement data."""
  94. devices = None
  95. try:
  96. devices = BLEScanner(
  97. scan_timeout=scan_timeout, interface=self._interface
  98. ).start()
  99. except bluepy.btle.BTLEManagementError:
  100. _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
  101. if devices is None:
  102. if retry < 1:
  103. _LOGGER.error(
  104. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  105. )
  106. return None
  107. _LOGGER.warning(
  108. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  109. retry,
  110. )
  111. time.sleep(DEFAULT_RETRY_TIMEOUT)
  112. return self.discover(retry - 1, scan_timeout)
  113. for dev in devices:
  114. if dev.getValueText(7) == UUID:
  115. dev_id = dev.addr.replace(":", "")
  116. self._all_services_data[dev_id] = {}
  117. self._all_services_data[dev_id]["mac_address"] = dev.addr
  118. for (adtype, desc, value) in dev.getScanData():
  119. if adtype == 22:
  120. _model = binascii.unhexlify(value[4:6]).decode()
  121. if _model == "H":
  122. self._all_services_data[dev_id]["data"] = _process_wohand(
  123. value[4:]
  124. )
  125. self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
  126. self._all_services_data[dev_id]["model"] = _model
  127. self._all_services_data[dev_id]["modelName"] = "WoHand"
  128. elif _model == "c":
  129. self._all_services_data[dev_id][
  130. "data"
  131. ] = _process_wocurtain(value[4:])
  132. self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
  133. self._all_services_data[dev_id]["model"] = _model
  134. self._all_services_data[dev_id]["modelName"] = "WoCurtain"
  135. elif _model == "T":
  136. self._all_services_data[dev_id][
  137. "data"
  138. ] = _process_wosensorth(value[4:])
  139. self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
  140. self._all_services_data[dev_id]["model"] = _model
  141. self._all_services_data[dev_id]["modelName"] = "WoSensorTH"
  142. else:
  143. continue
  144. else:
  145. self._all_services_data[dev_id][desc] = value
  146. return self._all_services_data
  147. def get_curtains(self) -> dict | None:
  148. """Return all WoCurtain/Curtains devices with services data."""
  149. if not self._all_services_data:
  150. self.discover()
  151. _curtain_devices = {}
  152. for item in self._all_services_data:
  153. if self._all_services_data[item]["model"] == "c":
  154. _curtain_devices[item] = self._all_services_data[item]
  155. return _curtain_devices
  156. def get_bots(self) -> dict | None:
  157. """Return all WoHand/Bot devices with services data."""
  158. if not self._all_services_data:
  159. self.discover()
  160. _bot_devices = {}
  161. for item in self._all_services_data:
  162. if self._all_services_data[item]["model"] == "H":
  163. _bot_devices[item] = self._all_services_data[item]
  164. return _bot_devices
  165. def get_device_data(self, mac) -> dict | None:
  166. """Return data for specific device."""
  167. if not self._all_services_data:
  168. self.discover()
  169. _switchbot_data = {}
  170. for item in self._all_services_data:
  171. if self._all_services_data[item]["mac_address"] == mac.replace("-", ":").lower():
  172. _switchbot_data = self._all_services_data[item]
  173. return _switchbot_data
  174. class SwitchbotDevice:
  175. """Base Representation of a Switchbot Device."""
  176. def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
  177. """Switchbot base class constructor."""
  178. self._interface = interface
  179. self._mac = mac.replace("-", ":").lower()
  180. self._device = None
  181. self._switchbot_device_data = {}
  182. self._scan_timeout = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
  183. self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  184. if password is None or password == "":
  185. self._password_encoded = None
  186. else:
  187. self._password_encoded = "%x" % (
  188. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  189. )
  190. def _connect(self) -> None:
  191. if self._device is not None:
  192. return
  193. try:
  194. _LOGGER.debug("Connecting to Switchbot")
  195. self._device = bluepy.btle.Peripheral(
  196. self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
  197. )
  198. _LOGGER.debug("Connected to Switchbot")
  199. except bluepy.btle.BTLEException:
  200. _LOGGER.debug("Failed connecting to Switchbot", exc_info=True)
  201. self._device = None
  202. raise
  203. def _disconnect(self) -> None:
  204. if self._device is None:
  205. return
  206. _LOGGER.debug("Disconnecting")
  207. try:
  208. self._device.disconnect()
  209. except bluepy.btle.BTLEException:
  210. _LOGGER.warning("Error disconnecting from Switchbot", exc_info=True)
  211. finally:
  212. self._device = None
  213. def _commandkey(self, key) -> str:
  214. if self._password_encoded is None:
  215. return key
  216. key_suffix = PRESS_KEY_SUFFIX
  217. if key == ON_KEY:
  218. key_suffix = ON_KEY_SUFFIX
  219. elif key == OFF_KEY:
  220. key_suffix = OFF_KEY_SUFFIX
  221. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  222. def _writekey(self, key) -> bool:
  223. _LOGGER.debug("Prepare to send")
  224. hand_service = self._device.getServiceByUUID(UUID)
  225. hand = hand_service.getCharacteristics(HANDLE)[0]
  226. _LOGGER.debug("Sending command, %s", key)
  227. write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
  228. if not write_result:
  229. _LOGGER.error(
  230. "Sent command but didn't get a response from Switchbot confirming command was sent."
  231. " Please check the Switchbot"
  232. )
  233. else:
  234. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
  235. return write_result
  236. def _sendcommand(self, key, retry) -> bool:
  237. send_success = False
  238. command = self._commandkey(key)
  239. _LOGGER.debug("Sending command to switchbot %s", command)
  240. try:
  241. self._connect()
  242. send_success = self._writekey(command)
  243. except bluepy.btle.BTLEException:
  244. _LOGGER.warning("Error talking to Switchbot", exc_info=True)
  245. finally:
  246. self._disconnect()
  247. if send_success:
  248. return True
  249. if retry < 1:
  250. _LOGGER.error(
  251. "Switchbot communication failed. Stopping trying", exc_info=True
  252. )
  253. return False
  254. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
  255. time.sleep(DEFAULT_RETRY_TIMEOUT)
  256. return self._sendcommand(key, retry - 1)
  257. def get_mac(self) -> str:
  258. """Return mac address of device."""
  259. return self._mac
  260. def get_battery_percent(self) -> int:
  261. """Return device battery level in percent."""
  262. if not self._switchbot_device_data:
  263. return None
  264. return self._switchbot_device_data["data"]["battery"]
  265. def get_device_data(self, retry=DEFAULT_RETRY_COUNT, interface=None) -> dict | None:
  266. """Find switchbot devices and their advertisement data."""
  267. if interface:
  268. _interface = interface
  269. else:
  270. _interface = self._interface
  271. devices = None
  272. try:
  273. devices = BLEScanner(
  274. scan_timeout=self._scan_timeout, interface=_interface
  275. ).start()
  276. except bluepy.btle.BTLEManagementError:
  277. _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
  278. if devices is None:
  279. if retry < 1:
  280. _LOGGER.error(
  281. "Scanning for Switchbot devices failed. Stop trying", exc_info=True
  282. )
  283. return None
  284. _LOGGER.warning(
  285. "Error scanning for Switchbot devices. Retrying (remaining: %d)",
  286. retry,
  287. )
  288. time.sleep(DEFAULT_RETRY_TIMEOUT)
  289. return self.get_device_data(retry=retry - 1, interface=_interface)
  290. for dev in devices:
  291. if self._mac.lower() == dev.addr.lower():
  292. self._switchbot_device_data["mac_address"] = dev.addr
  293. for (adtype, desc, value) in dev.getScanData():
  294. if adtype == 22:
  295. _model = binascii.unhexlify(value[4:6]).decode()
  296. if _model == "H":
  297. self._switchbot_device_data["data"] = _process_wohand(
  298. value[4:]
  299. )
  300. self._switchbot_device_data["data"]["rssi"] = dev.rssi
  301. self._switchbot_device_data["model"] = _model
  302. self._switchbot_device_data["modelName"] = "WoHand"
  303. elif _model == "c":
  304. self._switchbot_device_data["data"] = _process_wocurtain(
  305. value[4:]
  306. )
  307. self._switchbot_device_data["data"]["rssi"] = dev.rssi
  308. self._switchbot_device_data["model"] = _model
  309. self._switchbot_device_data["modelName"] = "WoCurtain"
  310. elif _model == "T":
  311. self._switchbot_device_data["data"] = _process_wosensorth(
  312. value[4:]
  313. )
  314. self._switchbot_device_data["data"]["rssi"] = dev.rssi
  315. self._switchbot_device_data["model"] = _model
  316. self._switchbot_device_data["modelName"] = "WoSensorTH"
  317. else:
  318. continue
  319. else:
  320. self._switchbot_device_data[desc] = value
  321. return self._switchbot_device_data
  322. class Switchbot(SwitchbotDevice):
  323. """Representation of a Switchbot."""
  324. def __init__(self, *args, **kwargs) -> None:
  325. """Switchbot Bot/WoHand constructor."""
  326. super().__init__(*args, **kwargs)
  327. self._inverse = kwargs.pop("inverse_mode", False)
  328. def update(self, interface=None) -> None:
  329. """Update mode, battery percent and state of device."""
  330. self.get_device_data(retry=self._retry_count, interface=interface)
  331. def turn_on(self) -> bool:
  332. """Turn device on."""
  333. return self._sendcommand(ON_KEY, self._retry_count)
  334. def turn_off(self) -> bool:
  335. """Turn device off."""
  336. return self._sendcommand(OFF_KEY, self._retry_count)
  337. def press(self) -> bool:
  338. """Press command to device."""
  339. return self._sendcommand(PRESS_KEY, self._retry_count)
  340. def switch_mode(self) -> str:
  341. """Return true or false from cache."""
  342. # To get actual position call update() first.
  343. if not self._switchbot_device_data:
  344. return None
  345. return self._switchbot_device_data["data"]["switchMode"]
  346. def is_on(self) -> bool:
  347. """Return switch state from cache."""
  348. # To get actual position call update() first.
  349. if not self._switchbot_device_data:
  350. return None
  351. if self._inverse:
  352. return not self._switchbot_device_data["data"]["isOn"]
  353. return self._switchbot_device_data["data"]["isOn"]
  354. class SwitchbotCurtain(SwitchbotDevice):
  355. """Representation of a Switchbot Curtain."""
  356. def __init__(self, *args, **kwargs) -> None:
  357. """Switchbot Curtain/WoCurtain constructor."""
  358. # The position of the curtain is saved returned with 0 = open and 100 = closed.
  359. # This is independent of the calibration of the curtain bot (Open left to right/
  360. # Open right to left/Open from the middle).
  361. # The parameter 'reverse_mode' reverse these values,
  362. # if 'reverse_mode' = True, position = 0 equals close
  363. # and position = 100 equals open. The parameter is default set to True so that
  364. # the definition of position is the same as in Home Assistant.
  365. super().__init__(*args, **kwargs)
  366. self._reverse = kwargs.pop("reverse_mode", True)
  367. def open(self) -> bool:
  368. """Send open command."""
  369. return self._sendcommand(OPEN_KEY, self._retry_count)
  370. def close(self) -> bool:
  371. """Send close command."""
  372. return self._sendcommand(CLOSE_KEY, self._retry_count)
  373. def stop(self) -> bool:
  374. """Send stop command to device."""
  375. return self._sendcommand(STOP_KEY, self._retry_count)
  376. def set_position(self, position: int) -> bool:
  377. """Send position command (0-100) to device."""
  378. position = (100 - position) if self._reverse else position
  379. hex_position = "%0.2X" % position
  380. return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  381. def update(self, interface=None) -> None:
  382. """Update position, battery percent and light level of device."""
  383. self.get_device_data(retry=self._retry_count, interface=interface)
  384. def get_position(self) -> int:
  385. """Return cached position (0-100) of Curtain."""
  386. # To get actual position call update() first.
  387. if not self._switchbot_device_data:
  388. return None
  389. return self._switchbot_device_data["data"]["position"]
  390. def get_light_level(self) -> int:
  391. """Return cached light level."""
  392. # To get actual light level call update() first.
  393. if not self._switchbot_device_data:
  394. return None
  395. return self._switchbot_device_data["data"]["lightLevel"]
  396. def is_reversed(self) -> bool:
  397. """Return True if the curtain open from left to right."""
  398. return self._reverse
  399. def is_calibrated(self) -> bool:
  400. """Return True curtain is calibrated."""
  401. # To get actual light level call update() first.
  402. if not self._switchbot_device_data:
  403. return None
  404. return self._switchbot_device_data["data"]["calibration"]