Browse Source

Added bot states and battery, refactored update methods. (#22)

* Added bot states and battery, refactored update methods.

* Fixes

* Add method to return battery.

* Implemented suggestions

* Fixed logic mistake on bot  is_on status.
RenierM26 2 years ago
parent
commit
2fca8be5ee
3 changed files with 121 additions and 39 deletions
  1. 3 0
      .gitignore
  2. 1 1
      setup.py
  3. 117 38
      switchbot/__init__.py

+ 3 - 0
.gitignore

@@ -102,3 +102,6 @@ venv.bak/
 
 # mypy
 .mypy_cache/
+switchbot/.vs/slnx.sqlite
+switchbot/.vs/switchbot/v16/.suo
+switchbot/.vs/VSWorkspaceState.json

+ 1 - 1
setup.py

@@ -4,7 +4,7 @@ setup(
     name = 'PySwitchbot',
     packages = ['switchbot'],
     install_requires=['bluepy'],
-    version = '0.10.0',
+    version = '0.10.1',
     description = 'A library to communicate with Switchbot',
     author='Daniel Hjelseth Hoyer',
     url='https://github.com/Danielhiversen/pySwitchbot/',

+ 117 - 38
switchbot/__init__.py

@@ -1,8 +1,7 @@
 """Library to handle connection with Switchbot"""
-import time
-
 import binascii
 import logging
+import time
 
 import bluepy
 
@@ -33,29 +32,34 @@ _LOGGER = logging.getLogger(__name__)
 
 class SwitchbotDevice:
     # pylint: disable=too-few-public-methods
+    # pylint: disable=too-many-instance-attributes
     """Base Representation of a Switchbot Device."""
 
     def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
         self._interface = interface
         self._mac = mac
         self._device = None
+        self._battery_percent = 0
         self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
-        self._time_between_update_command = kwargs.pop("time_between_update_command",
-                                                       DEFAULT_TIME_BETWEEN_UPDATE_COMMAND)
+        self._time_between_update_command = kwargs.pop(
+            "time_between_update_command", DEFAULT_TIME_BETWEEN_UPDATE_COMMAND
+        )
         self._last_time_command_send = time.time()
         if password is None or password == "":
             self._password_encoded = None
         else:
-            self._password_encoded = '%x' % (binascii.crc32(password.encode('ascii')) & 0xffffffff)
+            self._password_encoded = "%x" % (
+                binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
+            )
 
     def _connect(self) -> None:
         if self._device is not None:
             return
         try:
             _LOGGER.debug("Connecting to Switchbot...")
-            self._device = bluepy.btle.Peripheral(self._mac,
-                                                  bluepy.btle.ADDR_TYPE_RANDOM,
-                                                  self._interface)
+            self._device = bluepy.btle.Peripheral(
+                self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
+            )
             _LOGGER.debug("Connected to Switchbot.")
         except bluepy.btle.BTLEException:
             _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
@@ -91,8 +95,10 @@ class SwitchbotDevice:
         write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
         self._last_time_command_send = time.time()
         if not write_result:
-            _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
-                          "Please check the Switchbot.")
+            _LOGGER.error(
+                "Sent command but didn't get a response from Switchbot confirming command was sent."
+                " Please check the Switchbot."
+            )
         else:
             _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
         return write_result
@@ -111,12 +117,54 @@ class SwitchbotDevice:
         if send_success:
             return True
         if retry < 1:
-            _LOGGER.error("Switchbot communication failed. Stopping trying.", exc_info=True)
+            _LOGGER.error(
+                "Switchbot communication failed. Stopping trying.", exc_info=True
+            )
             return False
-        _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)...", retry)
+        _LOGGER.warning(
+            "Cannot connect to Switchbot. Retrying (remaining: %d)...", retry
+        )
         time.sleep(DEFAULT_RETRY_TIMEOUT)
         return self._sendcommand(key, retry - 1)
 
+    def get_servicedata(self, retry=DEFAULT_RETRY_COUNT, scan_timeout=5) -> bytearray:
+        """Get BTLE 16b Service Data,
+        returns after the given timeout period in seconds."""
+        devices = None
+
+        waiting_time = self._time_between_update_command - time.time()
+        if waiting_time > 0:
+            time.sleep(waiting_time)
+        try:
+            devices = bluepy.btle.Scanner().scan(scan_timeout)
+
+        except bluepy.btle.BTLEManagementError:
+            _LOGGER.warning("Error updating Switchbot.", exc_info=True)
+
+        if devices is None:
+            if retry < 1:
+                _LOGGER.error(
+                    "Switchbot update failed. Stopping trying.", exc_info=True
+                )
+                return None
+
+            _LOGGER.warning(
+                "Cannot update Switchbot. Retrying (remaining: %d)...", retry
+            )
+            time.sleep(DEFAULT_RETRY_TIMEOUT)
+            return self.get_servicedata(retry - 1, scan_timeout)
+
+        for device in devices:
+            if self._mac.lower() == device.addr.lower():
+                for (adtype, _, value) in device.getScanData():
+                    if adtype == 22:
+                        service_data = value[4:].encode()
+                        service_data = binascii.unhexlify(service_data)
+
+                        return service_data
+
+        return None
+
     def get_mac(self) -> str:
         """Returns the mac address of the device."""
         return self._mac
@@ -125,10 +173,40 @@ class SwitchbotDevice:
         """Returns the first time an update can be executed."""
         return self._last_time_command_send + self._time_between_update_command
 
+    def get_battery_percent(self) -> int:
+        """Returns device battery level in percent."""
+        return self._battery_percent
+
 
 class Switchbot(SwitchbotDevice):
     """Representation of a Switchbot."""
 
+    def __init__(self, *args, **kwargs) -> None:
+        self._is_on = None
+        self._mode = None
+        super().__init__(*args, **kwargs)
+
+    def update(self, scan_timeout=5) -> None:
+        """Updates the mode, battery percent and state of the device."""
+        barray = self.get_servicedata(scan_timeout=scan_timeout)
+
+        if barray is None:
+            return
+
+        _mode = barray[1] & 0b10000000  # 128 switch or 0 toggle
+        if _mode != 0:
+            self._mode = "switch"
+        else:
+            self._mode = "toggle"
+
+        _is_on = barray[1] & 0b01000000  # 64 on or 0 for off
+        if _is_on == 0 and self._mode == "switch":
+            self._is_on = True
+        else:
+            self._is_on = False
+
+        self._battery_percent = barray[2] & 0b01111111
+
     def turn_on(self) -> bool:
         """Turn device on."""
         return self._sendcommand(ON_KEY, self._retry_count)
@@ -141,6 +219,16 @@ class Switchbot(SwitchbotDevice):
         """Press command to device."""
         return self._sendcommand(PRESS_KEY, self._retry_count)
 
+    def switch_mode(self) -> str:
+        """Return Toggle or Switch from cache.
+        Run update first."""
+        return self._mode
+
+    def is_on(self) -> bool:
+        """Return switch state from cache.
+        Run update first."""
+        return self._is_on
+
 
 class SwitchbotCurtain(SwitchbotDevice):
     """Representation of a Switchbot Curtain."""
@@ -150,23 +238,24 @@ class SwitchbotCurtain(SwitchbotDevice):
         The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
         This is independent of the calibration of the curtain bot (Open left to right/
         Open right to left/Open from the middle).
-        The parameter 'reverse_mode' reverse these values, if 'reverse_mode' = True, position = 0 equals close
+        The parameter 'reverse_mode' reverse these values,
+        if 'reverse_mode' = True, position = 0 equals close
         and position = 100 equals open. The parameter is default set to True so that
         the definition of position is the same as in Home Assistant."""
-        self._reverse = kwargs.pop('reverse_mode', True)
+        self._reverse = kwargs.pop("reverse_mode", True)
         self._pos = 0
         self._light_level = 0
-        self._battery_percent = 0
+        self._is_calibrated = 0
         super().__init__(*args, **kwargs)
 
     def open(self) -> bool:
         """Send open command."""
-        self._pos = (100 if self._reverse else 0)
+        self._pos = 100 if self._reverse else 0
         return self._sendcommand(OPEN_KEY, self._retry_count)
 
     def close(self) -> bool:
         """Send close command."""
-        self._pos = (0 if self._reverse else 100)
+        self._pos = 0 if self._reverse else 100
         return self._sendcommand(CLOSE_KEY, self._retry_count)
 
     def stop(self) -> bool:
@@ -175,39 +264,29 @@ class SwitchbotCurtain(SwitchbotDevice):
 
     def set_position(self, position: int) -> bool:
         """Send position command (0-100) to device."""
-        position = ((100 - position) if self._reverse else position)
+        position = (100 - position) if self._reverse else position
         self._pos = position
         hex_position = "%0.2X" % position
         return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
 
     def update(self, scan_timeout=5) -> None:
-        """Updates the current position, battery percent and light level of the device.
-        Returns after the given timeout period in seconds."""
-        waiting_time = self.get_min_time_update() - time.time()
-        if waiting_time > 0:
-            time.sleep(waiting_time)
-        devices = bluepy.btle.Scanner().scan(scan_timeout)
+        """Updates the current position, battery percent and light level of the device."""
+        barray = self.get_servicedata(scan_timeout=scan_timeout)
 
-        for device in devices:
-            if self.get_mac().lower() == device.addr.lower():
-                for (adtype, _, value) in device.getScanData():
-                    if adtype == 22:
-                        barray = bytearray(value, 'ascii')
-                        self._battery_percent = int(barray[-6:-4], 16)
-                        position = max(min(int(barray[-4:-2], 16), 100), 0)
-                        self._pos = ((100 - position) if self._reverse else position)
-                        self._light_level = int(barray[-2:], 16)
+        if barray is None:
+            return
+
+        self._is_calibrated = barray[1] & 0b01000000
+        self._battery_percent = barray[2] & 0b01111111
+        position = max(min(barray[3] & 0b01111111, 100), 0)
+        self._pos = (100 - position) if self._reverse else position
+        self._light_level = (barray[4] >> 4) & 0b00001111  # light sensor level (1-10)
 
     def get_position(self) -> int:
         """Returns the current cached position (0-100), the actual position could vary.
         To get the actual position call update() first."""
         return self._pos
 
-    def get_battery_percent(self) -> int:
-        """Returns the current cached battery percent (0-100), the actual battery percent could vary.
-        To get the actual battery percent call update() first."""
-        return self._battery_percent
-
     def get_light_level(self) -> int:
         """Returns the current cached light level, the actual light level could vary.
         To get the actual light level call update() first."""