|
@@ -1,10 +1,14 @@
|
|
"""Library to handle connection with Switchbot"""
|
|
"""Library to handle connection with Switchbot"""
|
|
|
|
+import time
|
|
|
|
|
|
import binascii
|
|
import binascii
|
|
import logging
|
|
import logging
|
|
|
|
|
|
import bluepy
|
|
import bluepy
|
|
|
|
|
|
|
|
+DEFAULT_RETRY_COUNT = 3
|
|
|
|
+DEFAULT_RETRY_TIMEOUT = .2
|
|
|
|
+
|
|
UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
|
|
UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
|
|
HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
|
|
HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
|
|
|
|
|
|
@@ -18,53 +22,74 @@ _LOGGER = logging.getLogger(__name__)
|
|
class Switchbot:
|
|
class Switchbot:
|
|
"""Representation of a Switchbot."""
|
|
"""Representation of a Switchbot."""
|
|
|
|
|
|
- def __init__(self, mac) -> None:
|
|
+ def __init__(self, mac, retry_count=DEFAULT_RETRY_COUNT) -> None:
|
|
self._mac = mac
|
|
self._mac = mac
|
|
self._device = None
|
|
self._device = None
|
|
|
|
+ self._retry_count = retry_count
|
|
|
|
|
|
- def _connect(self) -> bool:
|
|
+ def _connect(self) -> None:
|
|
if self._device is not None:
|
|
if self._device is not None:
|
|
- _LOGGER.debug("Disconnecting")
|
|
+ return
|
|
- try:
|
|
|
|
- self._device.disconnect()
|
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
|
- pass
|
|
|
|
try:
|
|
try:
|
|
- _LOGGER.debug("Connecting")
|
|
+ _LOGGER.debug("Connecting to Switchbot...")
|
|
self._device = bluepy.btle.Peripheral(self._mac,
|
|
self._device = bluepy.btle.Peripheral(self._mac,
|
|
bluepy.btle.ADDR_TYPE_RANDOM)
|
|
bluepy.btle.ADDR_TYPE_RANDOM)
|
|
|
|
+ _LOGGER.debug("Connected to Switchbot.")
|
|
except bluepy.btle.BTLEException:
|
|
except bluepy.btle.BTLEException:
|
|
- _LOGGER.warning("Failed to connect to Switchbot", exc_info=True)
|
|
+ _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
|
|
- return False
|
|
+ self._device = None
|
|
- return True
|
|
+ raise
|
|
|
|
|
|
- def _sendpacket(self, key, retry=2) -> bool:
|
|
+ def _disconnect(self) -> None:
|
|
- if self._device is None and not self._connect():
|
|
+ if self._device is None:
|
|
- _LOGGER.error("Can not connect to switchbot.")
|
|
+ return
|
|
- return False
|
|
+ _LOGGER.debug("Disconnecting")
|
|
|
|
+ try:
|
|
|
|
+ self._device.disconnect()
|
|
|
|
+ except bluepy.btle.BTLEException:
|
|
|
|
+ _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
|
|
|
|
+ finally:
|
|
|
|
+ self._device = None
|
|
|
|
+
|
|
|
|
+ def _writekey(self, key) -> bool:
|
|
|
|
+ _LOGGER.debug("Prepare to send")
|
|
|
|
+ hand_service = self._device.getServiceByUUID(UUID)
|
|
|
|
+ hand = hand_service.getCharacteristics(HANDLE)[0]
|
|
|
|
+ _LOGGER.debug("Sending command, %s", key)
|
|
|
|
+ write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
|
|
|
|
+ if not write_result:
|
|
|
|
+ _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
|
|
|
|
+ "Please check the Switchbot.")
|
|
|
|
+ else:
|
|
|
|
+ _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
|
|
|
|
+ return write_result
|
|
|
|
|
|
|
|
+ def _sendcommand(self, key, retry) -> bool:
|
|
|
|
+ send_success = False
|
|
try:
|
|
try:
|
|
- _LOGGER.debug("Prepare to send")
|
|
+ self._connect()
|
|
- hand_service = self._device.getServiceByUUID(UUID)
|
|
+ send_success = self._writekey(key)
|
|
- hand = hand_service.getCharacteristics(HANDLE)[0]
|
|
|
|
- _LOGGER.debug("Sending command, %s", key)
|
|
|
|
- hand.write(binascii.a2b_hex(key))
|
|
|
|
except bluepy.btle.BTLEException:
|
|
except bluepy.btle.BTLEException:
|
|
- if retry < 1 or not self._connect():
|
|
+ _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
|
|
- _LOGGER.error("Can not connect to switchbot.", exc_info=True)
|
|
+ finally:
|
|
- return False
|
|
+ self._disconnect()
|
|
- _LOGGER.warning("Can not connect to switchbot. Retrying")
|
|
+ if send_success:
|
|
- return self._sendpacket(key, retry-1)
|
|
+ return send_success
|
|
- return True
|
|
+ if retry < 1:
|
|
-
|
|
+ _LOGGER.error("Switchbot communication failed. Stopping trying.", exc_info=True)
|
|
- def turn_on(self) -> None:
|
|
+ return False
|
|
|
|
+ _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)...", retry)
|
|
|
|
+ time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
|
+ return self._sendcommand(key, retry - 1)
|
|
|
|
+
|
|
|
|
+ def turn_on(self) -> bool:
|
|
"""Turn device on."""
|
|
"""Turn device on."""
|
|
- return self._sendpacket(ON_KEY)
|
|
+ return self._sendcommand(ON_KEY, self._retry_count)
|
|
|
|
|
|
- def turn_off(self) -> None:
|
|
+ def turn_off(self) -> bool:
|
|
"""Turn device off."""
|
|
"""Turn device off."""
|
|
- return self._sendpacket(OFF_KEY)
|
|
+ return self._sendcommand(OFF_KEY, self._retry_count)
|
|
|
|
|
|
- def press(self) -> None:
|
|
+ def press(self) -> bool:
|
|
"""Press command to device."""
|
|
"""Press command to device."""
|
|
- return self._sendpacket(PRESS_KEY)
|
|
+ return self._sendcommand(PRESS_KEY, self._retry_count)
|