__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. """Library to handle connection with Switchbot"""
  2. import binascii
  3. import logging
  4. import time
  5. import bluepy
  6. DEFAULT_RETRY_COUNT = 3
  7. DEFAULT_RETRY_TIMEOUT = 0.2
  8. DEFAULT_TIME_BETWEEN_UPDATE_COMMAND = 10
  9. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  10. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  11. KEY_PASSWORD_PREFIX = "5711"
  12. PRESS_KEY = "570100"
  13. ON_KEY = "570101"
  14. OFF_KEY = "570102"
  15. OPEN_KEY = "570f450105ff00" # 570F4501010100
  16. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  17. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  18. STOP_KEY = "570F450100ff"
  19. ON_KEY_SUFFIX = "01"
  20. OFF_KEY_SUFFIX = "02"
  21. PRESS_KEY_SUFFIX = "00"
  22. _LOGGER = logging.getLogger(__name__)
  23. class SwitchbotDevice:
  24. # pylint: disable=too-few-public-methods
  25. # pylint: disable=too-many-instance-attributes
  26. """Base Representation of a Switchbot Device."""
  27. def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
  28. self._interface = interface
  29. self._mac = mac
  30. self._device = None
  31. self._battery_percent = 0
  32. self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  33. self._time_between_update_command = kwargs.pop(
  34. "time_between_update_command", DEFAULT_TIME_BETWEEN_UPDATE_COMMAND
  35. )
  36. self._last_time_command_send = time.time()
  37. if password is None or password == "":
  38. self._password_encoded = None
  39. else:
  40. self._password_encoded = "%x" % (
  41. binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
  42. )
  43. def _connect(self) -> None:
  44. if self._device is not None:
  45. return
  46. try:
  47. _LOGGER.debug("Connecting to Switchbot...")
  48. self._device = bluepy.btle.Peripheral(
  49. self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
  50. )
  51. _LOGGER.debug("Connected to Switchbot.")
  52. except bluepy.btle.BTLEException:
  53. _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
  54. self._device = None
  55. raise
  56. def _disconnect(self) -> None:
  57. if self._device is None:
  58. return
  59. _LOGGER.debug("Disconnecting")
  60. try:
  61. self._device.disconnect()
  62. except bluepy.btle.BTLEException:
  63. _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
  64. finally:
  65. self._device = None
  66. def _commandkey(self, key) -> str:
  67. if self._password_encoded is None:
  68. return key
  69. key_suffix = PRESS_KEY_SUFFIX
  70. if key == ON_KEY:
  71. key_suffix = ON_KEY_SUFFIX
  72. elif key == OFF_KEY:
  73. key_suffix = OFF_KEY_SUFFIX
  74. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  75. def _writekey(self, key) -> bool:
  76. _LOGGER.debug("Prepare to send")
  77. hand_service = self._device.getServiceByUUID(UUID)
  78. hand = hand_service.getCharacteristics(HANDLE)[0]
  79. _LOGGER.debug("Sending command, %s", key)
  80. write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
  81. self._last_time_command_send = time.time()
  82. if not write_result:
  83. _LOGGER.error(
  84. "Sent command but didn't get a response from Switchbot confirming command was sent."
  85. " Please check the Switchbot."
  86. )
  87. else:
  88. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
  89. return write_result
  90. def _sendcommand(self, key, retry) -> bool:
  91. send_success = False
  92. command = self._commandkey(key)
  93. _LOGGER.debug("Sending command to switchbot %s", command)
  94. try:
  95. self._connect()
  96. send_success = self._writekey(command)
  97. except bluepy.btle.BTLEException:
  98. _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
  99. finally:
  100. self._disconnect()
  101. if send_success:
  102. return True
  103. if retry < 1:
  104. _LOGGER.error(
  105. "Switchbot communication failed. Stopping trying.", exc_info=True
  106. )
  107. return False
  108. _LOGGER.warning(
  109. "Cannot connect to Switchbot. Retrying (remaining: %d)...", retry
  110. )
  111. time.sleep(DEFAULT_RETRY_TIMEOUT)
  112. return self._sendcommand(key, retry - 1)
  113. def get_servicedata(self, retry=DEFAULT_RETRY_COUNT, scan_timeout=5) -> bytearray:
  114. """Get BTLE 16b Service Data,
  115. returns after the given timeout period in seconds."""
  116. devices = None
  117. waiting_time = self._time_between_update_command - time.time()
  118. if waiting_time > 0:
  119. time.sleep(waiting_time)
  120. try:
  121. devices = bluepy.btle.Scanner().scan(scan_timeout)
  122. except bluepy.btle.BTLEManagementError:
  123. _LOGGER.warning("Error updating Switchbot.", exc_info=True)
  124. if devices is None:
  125. if retry < 1:
  126. _LOGGER.error(
  127. "Switchbot update failed. Stopping trying.", exc_info=True
  128. )
  129. return None
  130. _LOGGER.warning(
  131. "Cannot update Switchbot. Retrying (remaining: %d)...", retry
  132. )
  133. time.sleep(DEFAULT_RETRY_TIMEOUT)
  134. return self.get_servicedata(retry - 1, scan_timeout)
  135. for device in devices:
  136. if self._mac.lower() == device.addr.lower():
  137. for (adtype, _, value) in device.getScanData():
  138. if adtype == 22:
  139. service_data = value[4:].encode()
  140. service_data = binascii.unhexlify(service_data)
  141. return service_data
  142. return None
  143. def get_mac(self) -> str:
  144. """Returns the mac address of the device."""
  145. return self._mac
  146. def get_min_time_update(self):
  147. """Returns the first time an update can be executed."""
  148. return self._last_time_command_send + self._time_between_update_command
  149. def get_battery_percent(self) -> int:
  150. """Returns device battery level in percent."""
  151. return self._battery_percent
  152. class Switchbot(SwitchbotDevice):
  153. """Representation of a Switchbot."""
  154. def __init__(self, *args, **kwargs) -> None:
  155. self._is_on = None
  156. self._mode = None
  157. super().__init__(*args, **kwargs)
  158. def update(self, scan_timeout=5) -> None:
  159. """Updates the mode, battery percent and state of the device."""
  160. barray = self.get_servicedata(scan_timeout=scan_timeout)
  161. if barray is None:
  162. return
  163. _mode = barray[1] & 0b10000000 # 128 switch or 0 toggle
  164. if _mode != 0:
  165. self._mode = "switch"
  166. else:
  167. self._mode = "toggle"
  168. _is_on = barray[1] & 0b01000000 # 64 on or 0 for off
  169. if _is_on == 0 and self._mode == "switch":
  170. self._is_on = True
  171. else:
  172. self._is_on = False
  173. self._battery_percent = barray[2] & 0b01111111
  174. def turn_on(self) -> bool:
  175. """Turn device on."""
  176. return self._sendcommand(ON_KEY, self._retry_count)
  177. def turn_off(self) -> bool:
  178. """Turn device off."""
  179. return self._sendcommand(OFF_KEY, self._retry_count)
  180. def press(self) -> bool:
  181. """Press command to device."""
  182. return self._sendcommand(PRESS_KEY, self._retry_count)
  183. def switch_mode(self) -> str:
  184. """Return Toggle or Switch from cache.
  185. Run update first."""
  186. return self._mode
  187. def is_on(self) -> bool:
  188. """Return switch state from cache.
  189. Run update first."""
  190. return self._is_on
  191. class SwitchbotCurtain(SwitchbotDevice):
  192. """Representation of a Switchbot Curtain."""
  193. def __init__(self, *args, **kwargs) -> None:
  194. """Constructor for a Switchbot Curtain.
  195. The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
  196. This is independent of the calibration of the curtain bot (Open left to right/
  197. Open right to left/Open from the middle).
  198. The parameter 'reverse_mode' reverse these values,
  199. if 'reverse_mode' = True, position = 0 equals close
  200. and position = 100 equals open. The parameter is default set to True so that
  201. the definition of position is the same as in Home Assistant."""
  202. self._reverse = kwargs.pop("reverse_mode", True)
  203. self._pos = 0
  204. self._light_level = 0
  205. self._is_calibrated = 0
  206. super().__init__(*args, **kwargs)
  207. def open(self) -> bool:
  208. """Send open command."""
  209. self._pos = 100 if self._reverse else 0
  210. return self._sendcommand(OPEN_KEY, self._retry_count)
  211. def close(self) -> bool:
  212. """Send close command."""
  213. self._pos = 0 if self._reverse else 100
  214. return self._sendcommand(CLOSE_KEY, self._retry_count)
  215. def stop(self) -> bool:
  216. """Send stop command to device."""
  217. return self._sendcommand(STOP_KEY, self._retry_count)
  218. def set_position(self, position: int) -> bool:
  219. """Send position command (0-100) to device."""
  220. position = (100 - position) if self._reverse else position
  221. self._pos = position
  222. hex_position = "%0.2X" % position
  223. return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  224. def update(self, scan_timeout=5) -> None:
  225. """Updates the current position, battery percent and light level of the device."""
  226. barray = self.get_servicedata(scan_timeout=scan_timeout)
  227. if barray is None:
  228. return
  229. self._is_calibrated = barray[1] & 0b01000000
  230. self._battery_percent = barray[2] & 0b01111111
  231. position = max(min(barray[3] & 0b01111111, 100), 0)
  232. self._pos = (100 - position) if self._reverse else position
  233. self._light_level = (barray[4] >> 4) & 0b00001111 # light sensor level (1-10)
  234. def get_position(self) -> int:
  235. """Returns the current cached position (0-100), the actual position could vary.
  236. To get the actual position call update() first."""
  237. return self._pos
  238. def get_light_level(self) -> int:
  239. """Returns the current cached light level, the actual light level could vary.
  240. To get the actual light level call update() first."""
  241. return self._light_level
  242. def is_reversed(self) -> bool:
  243. """Returns True if the curtain open from left to right."""
  244. return self._reverse