__init__.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. """Library to handle connection with Switchbot"""
  2. import time
  3. import binascii
  4. import logging
  5. import bluepy
  6. DEFAULT_RETRY_COUNT = 3
  7. DEFAULT_RETRY_TIMEOUT = 0.2
  8. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  9. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  10. KEY_PASSWORD_PREFIX = "5711"
  11. PRESS_KEY = "570100"
  12. ON_KEY = "570101"
  13. OFF_KEY = "570102"
  14. OPEN_KEY = "570f450105ff00" # 570F4501010100
  15. CLOSE_KEY = "570f450105ff64" # 570F4501010164
  16. POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50%
  17. STOP_KEY = "570F450100ff"
  18. ON_KEY_SUFFIX = "01"
  19. OFF_KEY_SUFFIX = "02"
  20. PRESS_KEY_SUFFIX = "00"
  21. _LOGGER = logging.getLogger(__name__)
  22. class SwitchbotDevice:
  23. # pylint: disable=too-few-public-methods
  24. """Base Representation of a Switchbot Device."""
  25. def __init__(self, mac, retry_count=DEFAULT_RETRY_COUNT, password=None, interface=None) -> None:
  26. self._interface = interface
  27. self._mac = mac
  28. self._device = None
  29. self._retry_count = retry_count
  30. if password is None or password == "":
  31. self._password_encoded = None
  32. else:
  33. self._password_encoded = '%x' % (binascii.crc32(password.encode('ascii')) & 0xffffffff)
  34. def _connect(self) -> None:
  35. if self._device is not None:
  36. return
  37. try:
  38. _LOGGER.debug("Connecting to Switchbot...")
  39. self._device = bluepy.btle.Peripheral(self._mac,
  40. bluepy.btle.ADDR_TYPE_RANDOM,
  41. self._interface)
  42. _LOGGER.debug("Connected to Switchbot.")
  43. except bluepy.btle.BTLEException:
  44. _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
  45. self._device = None
  46. raise
  47. def _disconnect(self) -> None:
  48. if self._device is None:
  49. return
  50. _LOGGER.debug("Disconnecting")
  51. try:
  52. self._device.disconnect()
  53. except bluepy.btle.BTLEException:
  54. _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
  55. finally:
  56. self._device = None
  57. def _commandkey(self, key) -> str:
  58. if self._password_encoded is None:
  59. return key
  60. key_suffix = PRESS_KEY_SUFFIX
  61. if key == ON_KEY:
  62. key_suffix = ON_KEY_SUFFIX
  63. elif key == OFF_KEY:
  64. key_suffix = OFF_KEY_SUFFIX
  65. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  66. def _writekey(self, key) -> bool:
  67. _LOGGER.debug("Prepare to send")
  68. hand_service = self._device.getServiceByUUID(UUID)
  69. hand = hand_service.getCharacteristics(HANDLE)[0]
  70. _LOGGER.debug("Sending command, %s", key)
  71. write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
  72. if not write_result:
  73. _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
  74. "Please check the Switchbot.")
  75. else:
  76. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
  77. return write_result
  78. def _sendcommand(self, key, retry) -> bool:
  79. send_success = False
  80. command = self._commandkey(key)
  81. _LOGGER.debug("Sending command to switchbot %s", command)
  82. try:
  83. self._connect()
  84. send_success = self._writekey(command)
  85. except bluepy.btle.BTLEException:
  86. _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
  87. finally:
  88. self._disconnect()
  89. if send_success:
  90. return True
  91. if retry < 1:
  92. _LOGGER.error("Switchbot communication failed. Stopping trying.", exc_info=True)
  93. return False
  94. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)...", retry)
  95. time.sleep(DEFAULT_RETRY_TIMEOUT)
  96. return self._sendcommand(key, retry - 1)
  97. class Switchbot(SwitchbotDevice):
  98. """Representation of a Switchbot."""
  99. def turn_on(self) -> bool:
  100. """Turn device on."""
  101. return self._sendcommand(ON_KEY, self._retry_count)
  102. def turn_off(self) -> bool:
  103. """Turn device off."""
  104. return self._sendcommand(OFF_KEY, self._retry_count)
  105. def press(self) -> bool:
  106. """Press command to device."""
  107. return self._sendcommand(PRESS_KEY, self._retry_count)
  108. class SwitchbotCurtain(SwitchbotDevice):
  109. """Representation of a Switchbot Curtain."""
  110. def open(self) -> bool:
  111. """Send open command."""
  112. return self._sendcommand(OPEN_KEY, self._retry_count)
  113. def close(self) -> bool:
  114. """Send close command."""
  115. return self._sendcommand(CLOSE_KEY, self._retry_count)
  116. def stop(self) -> bool:
  117. """Send stop command to device."""
  118. return self._sendcommand(STOP_KEY, self._retry_count)
  119. def set_position(self, position: int) -> bool:
  120. """Send position command (0-100) to device."""
  121. hex_position = "%0.2X" % (100 - position) # curtain position in reverse mode
  122. return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)