__init__.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. """Library to handle connection with Switchbot"""
  2. import time
  3. import binascii
  4. import logging
  5. import bluepy
  6. DEFAULT_RETRY_COUNT = 3
  7. DEFAULT_RETRY_TIMEOUT = 0.2
  8. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  9. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  10. KEY_PASSWORD_PREFIX = "5711"
  11. PRESS_KEY = "570100"
  12. ON_KEY = "570101"
  13. OFF_KEY = "570102"
  14. OPEN_KEY = "570f450105ff00" # 570F4501010100
  15. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  16. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  17. STOP_KEY = "570F45010001"
  18. ON_KEY_SUFFIX = "01"
  19. OFF_KEY_SUFFIX = "02"
  20. PRESS_KEY_SUFFIX = "00"
  21. _LOGGER = logging.getLogger(__name__)
  22. class SwitchbotDevice:
  23. # pylint: disable=too-few-public-methods
  24. """Base Representation of a Switchbot Device."""
  25. def __init__(self, mac, retry_count=DEFAULT_RETRY_COUNT, password=None) -> None:
  26. self._mac = mac
  27. self._device = None
  28. self._retry_count = retry_count
  29. if password is None or password == "":
  30. self._password_encoded = None
  31. else:
  32. self._password_encoded = '%x' % (binascii.crc32(password.encode('ascii')) & 0xffffffff)
  33. def _connect(self) -> None:
  34. if self._device is not None:
  35. return
  36. try:
  37. _LOGGER.debug("Connecting to Switchbot...")
  38. self._device = bluepy.btle.Peripheral(self._mac,
  39. bluepy.btle.ADDR_TYPE_RANDOM)
  40. _LOGGER.debug("Connected to Switchbot.")
  41. except bluepy.btle.BTLEException:
  42. _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
  43. self._device = None
  44. raise
  45. def _disconnect(self) -> None:
  46. if self._device is None:
  47. return
  48. _LOGGER.debug("Disconnecting")
  49. try:
  50. self._device.disconnect()
  51. except bluepy.btle.BTLEException:
  52. _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
  53. finally:
  54. self._device = None
  55. def _commandkey(self, key) -> str:
  56. if self._password_encoded is None:
  57. return key
  58. key_suffix = PRESS_KEY_SUFFIX
  59. if key == ON_KEY:
  60. key_suffix = ON_KEY_SUFFIX
  61. elif key == OFF_KEY:
  62. key_suffix = OFF_KEY_SUFFIX
  63. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  64. def _writekey(self, key) -> bool:
  65. _LOGGER.debug("Prepare to send")
  66. hand_service = self._device.getServiceByUUID(UUID)
  67. hand = hand_service.getCharacteristics(HANDLE)[0]
  68. _LOGGER.debug("Sending command, %s", key)
  69. write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
  70. if not write_result:
  71. _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
  72. "Please check the Switchbot.")
  73. else:
  74. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
  75. return write_result
  76. def _sendcommand(self, key, retry) -> bool:
  77. send_success = False
  78. command = self._commandkey(key)
  79. _LOGGER.debug("Sending command to switchbot %s", command)
  80. try:
  81. self._connect()
  82. send_success = self._writekey(command)
  83. except bluepy.btle.BTLEException:
  84. _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
  85. finally:
  86. self._disconnect()
  87. if send_success:
  88. return True
  89. if retry < 1:
  90. _LOGGER.error("Switchbot communication failed. Stopping trying.", exc_info=True)
  91. return False
  92. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)...", retry)
  93. time.sleep(DEFAULT_RETRY_TIMEOUT)
  94. return self._sendcommand(key, retry - 1)
  95. class Switchbot(SwitchbotDevice):
  96. """Representation of a Switchbot."""
  97. def turn_on(self) -> bool:
  98. """Turn device on."""
  99. return self._sendcommand(ON_KEY, self._retry_count)
  100. def turn_off(self) -> bool:
  101. """Turn device off."""
  102. return self._sendcommand(OFF_KEY, self._retry_count)
  103. def press(self) -> bool:
  104. """Press command to device."""
  105. return self._sendcommand(PRESS_KEY, self._retry_count)
  106. class SwitchbotCurtain(SwitchbotDevice):
  107. """Representation of a Switchbot Curtain."""
  108. def open(self) -> bool:
  109. """Send open command."""
  110. return self._sendcommand(OPEN_KEY, self._retry_count)
  111. def close(self) -> bool:
  112. """Send close command."""
  113. return self._sendcommand(CLOSE_KEY, self._retry_count)
  114. def stop(self) -> bool:
  115. """Send stop command to device."""
  116. return self._sendcommand(STOP_KEY, self._retry_count)
  117. def set_position(self, position: int) -> bool:
  118. """Send position command (0-100) to device."""
  119. hex_position = "%0.2X" % (100 - position) # curtain position in reverse mode
  120. return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)