__init__.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """Library to handle connection with Switchbot"""
  2. import time
  3. import binascii
  4. import logging
  5. import bluepy
  6. DEFAULT_RETRY_COUNT = 3
  7. DEFAULT_RETRY_TIMEOUT = 0.2
  8. DEFAULT_TIME_BETWEEN_UPDATE_COMMAND = 10
  9. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  10. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  11. KEY_PASSWORD_PREFIX = "5711"
  12. PRESS_KEY = "570100"
  13. ON_KEY = "570101"
  14. OFF_KEY = "570102"
  15. OPEN_KEY = "570f450105ff00" # 570F4501010100
  16. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  17. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  18. STOP_KEY = "570F450100ff"
  19. ON_KEY_SUFFIX = "01"
  20. OFF_KEY_SUFFIX = "02"
  21. PRESS_KEY_SUFFIX = "00"
  22. _LOGGER = logging.getLogger(__name__)
  23. class SwitchbotDevice:
  24. # pylint: disable=too-few-public-methods
  25. """Base Representation of a Switchbot Device."""
  26. def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
  27. self._interface = interface
  28. self._mac = mac
  29. self._device = None
  30. self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
  31. self._time_between_update_command = kwargs.pop("time_between_update_command",
  32. DEFAULT_TIME_BETWEEN_UPDATE_COMMAND)
  33. self._last_time_command_send = time.time()
  34. if password is None or password == "":
  35. self._password_encoded = None
  36. else:
  37. self._password_encoded = '%x' % (binascii.crc32(password.encode('ascii')) & 0xffffffff)
  38. def _connect(self) -> None:
  39. if self._device is not None:
  40. return
  41. try:
  42. _LOGGER.debug("Connecting to Switchbot...")
  43. self._device = bluepy.btle.Peripheral(self._mac,
  44. bluepy.btle.ADDR_TYPE_RANDOM,
  45. self._interface)
  46. _LOGGER.debug("Connected to Switchbot.")
  47. except bluepy.btle.BTLEException:
  48. _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
  49. self._device = None
  50. raise
  51. def _disconnect(self) -> None:
  52. if self._device is None:
  53. return
  54. _LOGGER.debug("Disconnecting")
  55. try:
  56. self._device.disconnect()
  57. except bluepy.btle.BTLEException:
  58. _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
  59. finally:
  60. self._device = None
  61. def _commandkey(self, key) -> str:
  62. if self._password_encoded is None:
  63. return key
  64. key_suffix = PRESS_KEY_SUFFIX
  65. if key == ON_KEY:
  66. key_suffix = ON_KEY_SUFFIX
  67. elif key == OFF_KEY:
  68. key_suffix = OFF_KEY_SUFFIX
  69. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  70. def _writekey(self, key) -> bool:
  71. _LOGGER.debug("Prepare to send")
  72. hand_service = self._device.getServiceByUUID(UUID)
  73. hand = hand_service.getCharacteristics(HANDLE)[0]
  74. _LOGGER.debug("Sending command, %s", key)
  75. write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
  76. self._last_time_command_send = time.time()
  77. if not write_result:
  78. _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
  79. "Please check the Switchbot.")
  80. else:
  81. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
  82. return write_result
  83. def _sendcommand(self, key, retry) -> bool:
  84. send_success = False
  85. command = self._commandkey(key)
  86. _LOGGER.debug("Sending command to switchbot %s", command)
  87. try:
  88. self._connect()
  89. send_success = self._writekey(command)
  90. except bluepy.btle.BTLEException:
  91. _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
  92. finally:
  93. self._disconnect()
  94. if send_success:
  95. return True
  96. if retry < 1:
  97. _LOGGER.error("Switchbot communication failed. Stopping trying.", exc_info=True)
  98. return False
  99. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)...", retry)
  100. time.sleep(DEFAULT_RETRY_TIMEOUT)
  101. return self._sendcommand(key, retry - 1)
  102. def get_mac(self) -> str:
  103. """Returns the mac address of the device."""
  104. return self._mac
  105. def get_min_time_update(self):
  106. """Returns the first time an update can be executed."""
  107. return self._last_time_command_send + self._time_between_update_command
  108. class Switchbot(SwitchbotDevice):
  109. """Representation of a Switchbot."""
  110. def turn_on(self) -> bool:
  111. """Turn device on."""
  112. return self._sendcommand(ON_KEY, self._retry_count)
  113. def turn_off(self) -> bool:
  114. """Turn device off."""
  115. return self._sendcommand(OFF_KEY, self._retry_count)
  116. def press(self) -> bool:
  117. """Press command to device."""
  118. return self._sendcommand(PRESS_KEY, self._retry_count)
  119. class SwitchbotCurtain(SwitchbotDevice):
  120. """Representation of a Switchbot Curtain."""
  121. def __init__(self, *args, **kwargs) -> None:
  122. """Constructor for a Switchbot Curtain.
  123. The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
  124. This is independent of the calibration of the curtain bot (Open left to right/
  125. Open right to left/Open from the middle).
  126. The parameter 'reverse_mode' reverse these values, if 'reverse_mode' = True, position = 0 equals close
  127. and position = 100 equals open. The parameter is default set to True so that
  128. the definition of position is the same as in Home Assistant."""
  129. self._reverse = kwargs.pop('reverse_mode', True)
  130. self._pos = 0
  131. self._light_level = 0
  132. self._battery_percent = 0
  133. super().__init__(*args, **kwargs)
  134. def open(self) -> bool:
  135. """Send open command."""
  136. self._pos = (100 if self._reverse else 0)
  137. return self._sendcommand(OPEN_KEY, self._retry_count)
  138. def close(self) -> bool:
  139. """Send close command."""
  140. self._pos = (0 if self._reverse else 100)
  141. return self._sendcommand(CLOSE_KEY, self._retry_count)
  142. def stop(self) -> bool:
  143. """Send stop command to device."""
  144. return self._sendcommand(STOP_KEY, self._retry_count)
  145. def set_position(self, position: int) -> bool:
  146. """Send position command (0-100) to device."""
  147. position = ((100 - position) if self._reverse else position)
  148. self._pos = position
  149. hex_position = "%0.2X" % position
  150. return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
  151. def update(self, scan_timeout=5) -> None:
  152. """Updates the current position, battery percent and light level of the device.
  153. Returns after the given timeout period in seconds."""
  154. waiting_time = self.get_min_time_update() - time.time()
  155. if waiting_time > 0:
  156. time.sleep(waiting_time)
  157. devices = bluepy.btle.Scanner().scan(scan_timeout)
  158. for device in devices:
  159. if self.get_mac().lower() == device.addr.lower():
  160. for (adtype, _, value) in device.getScanData():
  161. if adtype == 22:
  162. barray = bytearray(value, 'ascii')
  163. self._battery_percent = int(barray[-6:-4], 16)
  164. position = max(min(int(barray[-4:-2], 16), 100), 0)
  165. self._pos = ((100 - position) if self._reverse else position)
  166. self._light_level = int(barray[-2:], 16)
  167. def get_position(self) -> int:
  168. """Returns the current cached position (0-100), the actual position could vary.
  169. To get the actual position call update() first."""
  170. return self._pos
  171. def get_battery_percent(self) -> int:
  172. """Returns the current cached battery percent (0-100), the actual battery percent could vary.
  173. To get the actual battery percent call update() first."""
  174. return self._battery_percent
  175. def get_light_level(self) -> int:
  176. """Returns the current cached light level, the actual light level could vary.
  177. To get the actual light level call update() first."""
  178. return self._light_level
  179. def is_reversed(self) -> bool:
  180. """Returns True if the curtain open from left to right."""
  181. return self._reverse