1
0

__init__.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """Library to handle connection with Switchbot"""
  2. import time
  3. import binascii
  4. import logging
  5. import bluepy
  6. from func_timeout import func_timeout, FunctionTimedOut
  7. DEFAULT_RETRY_COUNT = 3
  8. DEFAULT_RETRY_TIMEOUT = .2
  9. UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
  10. HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
  11. KEY_PASSWORD_PREFIX = "5711"
  12. PRESS_KEY = "570100"
  13. ON_KEY = "570101"
  14. OFF_KEY = "570102"
  15. ON_KEY_SUFFIX = "01"
  16. OFF_KEY_SUFFIX = "02"
  17. PRESS_KEY_SUFFIX = "00"
  18. _LOGGER = logging.getLogger(__name__)
  19. class Switchbot:
  20. """Representation of a Switchbot."""
  21. def __init__(self, mac, retry_count=DEFAULT_RETRY_COUNT, password=None) -> None:
  22. self._mac = mac
  23. self._device = None
  24. self._retry_count = retry_count
  25. if password is None or password == "":
  26. self._password_encoded = None
  27. else:
  28. self._password_encoded = '%x' % (binascii.crc32(password.encode('ascii')) & 0xffffffff)
  29. def _connect(self) -> None:
  30. if self._device is not None:
  31. return
  32. try:
  33. _LOGGER.debug("Connecting to Switchbot...")
  34. self._device = func_timeout(15, bluepy.btle.Peripheral,
  35. args=(self._mac, bluepy.btle.ADDR_TYPE_RANDOM))
  36. _LOGGER.debug("Connected to Switchbot.")
  37. except FunctionTimedOut:
  38. _LOGGER.error("Failed connecting to Switchbot within 5 seconds and was terminated.")
  39. except bluepy.btle.BTLEException:
  40. _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
  41. self._device = None
  42. raise
  43. def _disconnect(self) -> None:
  44. if self._device is None:
  45. return
  46. _LOGGER.debug("Disconnecting")
  47. try:
  48. self._device.disconnect()
  49. except bluepy.btle.BTLEException:
  50. _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
  51. finally:
  52. self._device = None
  53. def _commandkey(self, key) -> str:
  54. if self._password_encoded is None:
  55. return key
  56. key_suffix = PRESS_KEY_SUFFIX
  57. if key == ON_KEY:
  58. key_suffix = ON_KEY_SUFFIX
  59. elif key == OFF_KEY:
  60. key_suffix = OFF_KEY_SUFFIX
  61. return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
  62. def _writekey(self, key) -> bool:
  63. _LOGGER.debug("Prepare to send")
  64. hand_service = self._device.getServiceByUUID(UUID)
  65. hand = hand_service.getCharacteristics(HANDLE)[0]
  66. _LOGGER.debug("Sending command, %s", key)
  67. write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
  68. if not write_result:
  69. _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
  70. "Please check the Switchbot.")
  71. else:
  72. _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
  73. return write_result
  74. def _sendcommand(self, key, retry) -> bool:
  75. send_success = False
  76. command = self._commandkey(key)
  77. _LOGGER.debug("Sending command to switchbot %s", command)
  78. try:
  79. self._connect()
  80. send_success = self._writekey(command)
  81. except (bluepy.btle.BTLEException, FunctionTimedOut):
  82. _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
  83. finally:
  84. self._disconnect()
  85. if send_success:
  86. return True
  87. if retry < 1:
  88. _LOGGER.error("Switchbot communication failed. Stopping trying.", exc_info=True)
  89. return False
  90. _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)...", retry)
  91. time.sleep(DEFAULT_RETRY_TIMEOUT)
  92. return self._sendcommand(key, retry - 1)
  93. def turn_on(self) -> bool:
  94. """Turn device on."""
  95. return self._sendcommand(ON_KEY, self._retry_count)
  96. def turn_off(self) -> bool:
  97. """Turn device off."""
  98. return self._sendcommand(OFF_KEY, self._retry_count)
  99. def press(self) -> bool:
  100. """Press command to device."""
  101. return self._sendcommand(PRESS_KEY, self._retry_count)