Browse Source

Add Class to discover all switchbots, all curtain bots or all bot (woHand) (#23)

* New class to fetch all switchbot device data for all devices in range.

* Method to return all switchbot info

* Update __init__.py

* Update __init__.py

* Refactor code into methods

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Add invert for switch, cleanup variable names

* Add method for spesific mac to Switchbots class, inverse isOn check, add get is calibrated to main switchbot class.

* Add temperature sensor to discovery. (I have no way to test this)

* Update __init__.py

* Update setup.py

* Remove waiting time. This can be handled by hassio

* Restructured switchbot base class for device discovery and single poll for all device updates.

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Prevent additional runs if update fails.

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Update setup.py

Co-authored-by: Daniel Hjelseth Høyer <mail@dahoiv.net>

* Update switchbot/__init__.py

Co-authored-by: Daniel Hjelseth Høyer <mail@dahoiv.net>

* New class for device scanning.

* Changed scan timeout and retry count to use same variable format as rest of class.

* Handle _connect, _writekey and _disconnect via CONNECT_LOCK.

* SwitchbotDevice only returns data if update is called, btle interface can be specified on scanning methods.

-Add basic error handling to prevent key errors on discovery exceptions.

* Small key reference fix in GetSwitchbotDevices class.

* Cleanup pointless error handling code.

Co-authored-by: Daniel Hjelseth Høyer <mail@dahoiv.net>
RenierM26 2 years ago
parent
commit
590b58a43d
2 changed files with 305 additions and 123 deletions
  1. 1 1
      setup.py
  2. 304 122
      switchbot/__init__.py

+ 1 - 1
setup.py

@@ -4,7 +4,7 @@ setup(
     name = 'PySwitchbot',
     packages = ['switchbot'],
     install_requires=['bluepy'],
-    version = '0.10.1',
+    version = '0.11.0',
     description = 'A library to communicate with Switchbot',
     author='Daniel Hjelseth Hoyer',
     url='https://github.com/Danielhiversen/pySwitchbot/',

+ 304 - 122
switchbot/__init__.py

@@ -1,13 +1,16 @@
-"""Library to handle connection with Switchbot"""
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
 import binascii
 import logging
+import threading
 import time
 
 import bluepy
 
 DEFAULT_RETRY_COUNT = 3
-DEFAULT_RETRY_TIMEOUT = 0.2
-DEFAULT_TIME_BETWEEN_UPDATE_COMMAND = 10
+DEFAULT_RETRY_TIMEOUT = 1
+DEFAULT_SCAN_TIMEOUT = 5
 
 UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
 HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
@@ -28,23 +31,191 @@ OFF_KEY_SUFFIX = "02"
 PRESS_KEY_SUFFIX = "00"
 
 _LOGGER = logging.getLogger(__name__)
+CONNECT_LOCK = threading.Lock()
+
+
+def _process_wohand(data) -> dict:
+    """Process woHand/Bot services data."""
+    _bot_data = {}
+
+    _sensor_data = binascii.unhexlify(data.encode())
+
+    # 128 switch or 0 press.
+    _bot_data["switchMode"] = bool(_sensor_data[1] & 0b10000000)
+
+    # 64 off or 0 for on, if not inversed in app.
+    if _bot_data["switchMode"]:
+        _bot_data["isOn"] = not bool(_sensor_data[1] & 0b01000000)
+
+    else:
+        _bot_data["isOn"] = False
+
+    _bot_data["battery"] = _sensor_data[2] & 0b01111111
+
+    return _bot_data
+
+
+def _process_wocurtain(data, reverse=True) -> dict:
+    """Process woCurtain/Curtain services data."""
+    _curtain_data = {}
+
+    _sensor_data = binascii.unhexlify(data.encode())
+
+    _curtain_data["calibration"] = bool(_sensor_data[1] & 0b01000000)
+    _curtain_data["battery"] = _sensor_data[2] & 0b01111111
+    _position = max(min(_sensor_data[3] & 0b01111111, 100), 0)
+    _curtain_data["position"] = (100 - _position) if reverse else _position
+
+    # light sensor level (1-10)
+    _curtain_data["lightLevel"] = (_sensor_data[4] >> 4) & 0b00001111
+
+    return _curtain_data
+
+
+def _process_wosensorth(data) -> dict:
+    """Process woSensorTH/Temp sensor services data."""
+    _wosensorth_data = {}
+
+    _sensor_data = binascii.unhexlify(data.encode())
+
+    _temp_sign = _sensor_data[4] & 0b10000000
+    _temp_c = _temp_sign * ((_sensor_data[4] & 0b01111111) + (_sensor_data[3] / 10))
+    _temp_f = (_temp_c * 9 / 5) + 32
+    _temp_f = (_temp_f * 10) / 10
+
+    _wosensorth_data["temp"]["c"] = _temp_c
+    _wosensorth_data["temp"]["f"] = _temp_f
+
+    _wosensorth_data["fahrenheit"] = bool(_sensor_data[5] & 0b10000000)
+    _wosensorth_data["humidity"] = _sensor_data[5] & 0b01111111
+    _wosensorth_data["battery"] = _sensor_data[2] & 0b01111111
+
+    return _wosensorth_data
+
+
+class GetSwitchbotDevices:
+    """Scan for all Switchbot devices and return by type."""
+
+    def __init__(self, interface=None) -> None:
+        """Get switchbot devices class constructor."""
+        self._interface = interface
+        self._all_services_data = {}
+
+    def discover(
+        self, retry=DEFAULT_RETRY_COUNT, scan_timeout=DEFAULT_SCAN_TIMEOUT
+    ) -> dict | None:
+        """Find switchbot devices and their advertisement data."""
+
+        devices = None
+
+        try:
+            devices = bluepy.btle.Scanner(self._interface).scan(scan_timeout)
+
+        except bluepy.btle.BTLEManagementError:
+            _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
+
+        if devices is None:
+            if retry < 1:
+                _LOGGER.error(
+                    "Scanning for Switchbot devices failed. Stop trying", exc_info=True
+                )
+                return None
+
+            _LOGGER.warning(
+                "Error scanning for Switchbot devices. Retrying (remaining: %d)",
+                retry,
+            )
+            time.sleep(DEFAULT_RETRY_TIMEOUT)
+            return self.discover(retry - 1, scan_timeout)
+
+        for dev in devices:
+            if dev.getValueText(7) == UUID:
+                dev_id = dev.addr.replace(":", "")
+                self._all_services_data[dev_id] = {}
+                self._all_services_data[dev_id]["mac_address"] = dev.addr
+                for (adtype, desc, value) in dev.getScanData():
+                    if adtype == 22:
+                        _model = binascii.unhexlify(value[4:6]).decode()
+                        if _model == "H":
+                            self._all_services_data[dev_id]["data"] = _process_wohand(
+                                value[4:]
+                            )
+                            self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
+                            self._all_services_data[dev_id]["model"] = _model
+                            self._all_services_data[dev_id]["modelName"] = "WoHand"
+                        elif _model == "c":
+                            self._all_services_data[dev_id][
+                                "data"
+                            ] = _process_wocurtain(value[4:])
+                            self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
+                            self._all_services_data[dev_id]["model"] = _model
+                            self._all_services_data[dev_id]["modelName"] = "WoCurtain"
+                        elif _model == "T":
+                            self._all_services_data[dev_id][
+                                "data"
+                            ] = _process_wosensorth(value[4:])
+                            self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
+                            self._all_services_data[dev_id]["model"] = _model
+                            self._all_services_data[dev_id]["modelName"] = "WoSensorTH"
+
+                        else:
+                            continue
+                    else:
+                        self._all_services_data[dev_id][desc] = value
+
+        return self._all_services_data
+
+    def get_curtains(self) -> dict | None:
+        """Return all WoCurtain/Curtains devices with services data."""
+        if not self._all_services_data:
+            self.discover()
+
+        _curtain_devices = {}
+
+        for item in self._all_services_data:
+            if self._all_services_data[item]["model"] == "c":
+                _curtain_devices[item] = self._all_services_data[item]
+
+        return _curtain_devices
+
+    def get_bots(self) -> dict | None:
+        """Return all WoHand/Bot devices with services data."""
+        if not self._all_services_data:
+            self.discover()
+
+        _bot_devices = {}
+
+        for item in self._all_services_data:
+            if self._all_services_data[item]["model"] == "H":
+                _bot_devices[item] = self._all_services_data[item]
+
+        return _bot_devices
+
+    def get_device_data(self, mac) -> dict | None:
+        """Return data for specific device."""
+        if not self._all_services_data:
+            self.discover()
+
+        _switchbot_data = {}
+
+        for item in self._all_services_data:
+            if self._all_services_data[item]["mac_address"] == mac:
+                _switchbot_data = self._all_services_data[item]
+
+        return _switchbot_data
 
 
 class SwitchbotDevice:
-    # pylint: disable=too-few-public-methods
-    # pylint: disable=too-many-instance-attributes
     """Base Representation of a Switchbot Device."""
 
     def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
+        """Switchbot base class constructor."""
         self._interface = interface
         self._mac = mac
         self._device = None
-        self._battery_percent = 0
+        self._switchbot_device_data = {}
+        self._scan_timeout = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
         self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
-        self._time_between_update_command = kwargs.pop(
-            "time_between_update_command", DEFAULT_TIME_BETWEEN_UPDATE_COMMAND
-        )
-        self._last_time_command_send = time.time()
         if password is None or password == "":
             self._password_encoded = None
         else:
@@ -56,13 +227,13 @@ class SwitchbotDevice:
         if self._device is not None:
             return
         try:
-            _LOGGER.debug("Connecting to Switchbot...")
+            _LOGGER.debug("Connecting to Switchbot")
             self._device = bluepy.btle.Peripheral(
                 self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
             )
-            _LOGGER.debug("Connected to Switchbot.")
+            _LOGGER.debug("Connected to Switchbot")
         except bluepy.btle.BTLEException:
-            _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True)
+            _LOGGER.debug("Failed connecting to Switchbot", exc_info=True)
             self._device = None
             raise
 
@@ -73,7 +244,7 @@ class SwitchbotDevice:
         try:
             self._device.disconnect()
         except bluepy.btle.BTLEException:
-            _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True)
+            _LOGGER.warning("Error disconnecting from Switchbot", exc_info=True)
         finally:
             self._device = None
 
@@ -93,119 +264,124 @@ class SwitchbotDevice:
         hand = hand_service.getCharacteristics(HANDLE)[0]
         _LOGGER.debug("Sending command, %s", key)
         write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
-        self._last_time_command_send = time.time()
         if not write_result:
             _LOGGER.error(
                 "Sent command but didn't get a response from Switchbot confirming command was sent."
-                " Please check the Switchbot."
+                " Please check the Switchbot"
             )
         else:
-            _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac)
+            _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
         return write_result
 
     def _sendcommand(self, key, retry) -> bool:
         send_success = False
         command = self._commandkey(key)
         _LOGGER.debug("Sending command to switchbot %s", command)
-        try:
-            self._connect()
-            send_success = self._writekey(command)
-        except bluepy.btle.BTLEException:
-            _LOGGER.warning("Error talking to Switchbot.", exc_info=True)
-        finally:
-            self._disconnect()
+        with CONNECT_LOCK:
+            try:
+                self._connect()
+                send_success = self._writekey(command)
+            except bluepy.btle.BTLEException:
+                _LOGGER.warning("Error talking to Switchbot", exc_info=True)
+            finally:
+                self._disconnect()
         if send_success:
             return True
         if retry < 1:
             _LOGGER.error(
-                "Switchbot communication failed. Stopping trying.", exc_info=True
+                "Switchbot communication failed. Stopping trying", exc_info=True
             )
             return False
-        _LOGGER.warning(
-            "Cannot connect to Switchbot. Retrying (remaining: %d)...", retry
-        )
+        _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
         time.sleep(DEFAULT_RETRY_TIMEOUT)
         return self._sendcommand(key, retry - 1)
 
-    def get_servicedata(self, retry=DEFAULT_RETRY_COUNT, scan_timeout=5) -> bytearray:
-        """Get BTLE 16b Service Data,
-        returns after the given timeout period in seconds."""
+    def get_mac(self) -> str:
+        """Return mac address of device."""
+        return self._mac
+
+    def get_battery_percent(self) -> int:
+        """Return device battery level in percent."""
+        if not self._switchbot_device_data:
+            return None
+        return self._switchbot_device_data["data"]["battery"]
+
+    def get_device_data(self, retry=DEFAULT_RETRY_COUNT, interface=None) -> dict | None:
+        """Find switchbot devices and their advertisement data."""
+        if interface:
+            _interface = interface
+        else:
+            _interface = self._interface
+
         devices = None
 
-        waiting_time = self._time_between_update_command - time.time()
-        if waiting_time > 0:
-            time.sleep(waiting_time)
         try:
-            devices = bluepy.btle.Scanner().scan(scan_timeout)
+            devices = bluepy.btle.Scanner(_interface).scan(self._scan_timeout)
 
         except bluepy.btle.BTLEManagementError:
-            _LOGGER.warning("Error updating Switchbot.", exc_info=True)
+            _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
 
         if devices is None:
             if retry < 1:
                 _LOGGER.error(
-                    "Switchbot update failed. Stopping trying.", exc_info=True
+                    "Scanning for Switchbot devices failed. Stop trying", exc_info=True
                 )
                 return None
 
             _LOGGER.warning(
-                "Cannot update Switchbot. Retrying (remaining: %d)...", retry
+                "Error scanning for Switchbot devices. Retrying (remaining: %d)",
+                retry,
             )
             time.sleep(DEFAULT_RETRY_TIMEOUT)
-            return self.get_servicedata(retry - 1, scan_timeout)
+            return self.get_device_data(retry=retry - 1, interface=_interface)
 
-        for device in devices:
-            if self._mac.lower() == device.addr.lower():
-                for (adtype, _, value) in device.getScanData():
+        for dev in devices:
+            if self._mac.lower() == dev.addr.lower():
+                self._switchbot_device_data["mac_address"] = dev.addr
+                for (adtype, desc, value) in dev.getScanData():
                     if adtype == 22:
-                        service_data = value[4:].encode()
-                        service_data = binascii.unhexlify(service_data)
-
-                        return service_data
-
-        return None
-
-    def get_mac(self) -> str:
-        """Returns the mac address of the device."""
-        return self._mac
-
-    def get_min_time_update(self):
-        """Returns the first time an update can be executed."""
-        return self._last_time_command_send + self._time_between_update_command
-
-    def get_battery_percent(self) -> int:
-        """Returns device battery level in percent."""
-        return self._battery_percent
+                        _model = binascii.unhexlify(value[4:6]).decode()
+                        if _model == "H":
+                            self._switchbot_device_data["data"] = _process_wohand(
+                                value[4:]
+                            )
+                            self._switchbot_device_data["data"]["rssi"] = dev.rssi
+                            self._switchbot_device_data["model"] = _model
+                            self._switchbot_device_data["modelName"] = "WoHand"
+                        elif _model == "c":
+                            self._switchbot_device_data["data"] = _process_wocurtain(
+                                value[4:]
+                            )
+                            self._switchbot_device_data["data"]["rssi"] = dev.rssi
+                            self._switchbot_device_data["model"] = _model
+                            self._switchbot_device_data["modelName"] = "WoCurtain"
+                        elif _model == "T":
+                            self._switchbot_device_data["data"] = _process_wosensorth(
+                                value[4:]
+                            )
+                            self._switchbot_device_data["data"]["rssi"] = dev.rssi
+                            self._switchbot_device_data["model"] = _model
+                            self._switchbot_device_data["modelName"] = "WoSensorTH"
+
+                        else:
+                            continue
+                    else:
+                        self._switchbot_device_data[desc] = value
+
+        return self._switchbot_device_data
 
 
 class Switchbot(SwitchbotDevice):
     """Representation of a Switchbot."""
 
     def __init__(self, *args, **kwargs) -> None:
-        self._is_on = None
-        self._mode = None
+        """Switchbot Bot/WoHand constructor."""
         super().__init__(*args, **kwargs)
+        self._inverse = kwargs.pop("inverse_mode", False)
 
-    def update(self, scan_timeout=5) -> None:
-        """Updates the mode, battery percent and state of the device."""
-        barray = self.get_servicedata(scan_timeout=scan_timeout)
-
-        if barray is None:
-            return
-
-        _mode = barray[1] & 0b10000000  # 128 switch or 0 toggle
-        if _mode != 0:
-            self._mode = "switch"
-        else:
-            self._mode = "toggle"
-
-        _is_on = barray[1] & 0b01000000  # 64 on or 0 for off
-        if _is_on == 0 and self._mode == "switch":
-            self._is_on = True
-        else:
-            self._is_on = False
-
-        self._battery_percent = barray[2] & 0b01111111
+    def update(self, interface=None) -> None:
+        """Update mode, battery percent and state of device."""
+        self.get_device_data(retry=self._retry_count, interface=interface)
 
     def turn_on(self) -> bool:
         """Turn device on."""
@@ -220,42 +396,47 @@ class Switchbot(SwitchbotDevice):
         return self._sendcommand(PRESS_KEY, self._retry_count)
 
     def switch_mode(self) -> str:
-        """Return Toggle or Switch from cache.
-        Run update first."""
-        return self._mode
+        """Return true or false from cache."""
+        # To get actual position call update() first.
+        if not self._switchbot_device_data:
+            return None
+        return self._switchbot_device_data["data"]["switchMode"]
 
     def is_on(self) -> bool:
-        """Return switch state from cache.
-        Run update first."""
-        return self._is_on
+        """Return switch state from cache."""
+        # To get actual position call update() first.
+        if not self._switchbot_device_data:
+            return None
+
+        if self._inverse:
+            return not self._switchbot_device_data["data"]["isOn"]
+
+        return self._switchbot_device_data["data"]["isOn"]
 
 
 class SwitchbotCurtain(SwitchbotDevice):
     """Representation of a Switchbot Curtain."""
 
     def __init__(self, *args, **kwargs) -> None:
-        """Constructor for a Switchbot Curtain.
-        The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
-        This is independent of the calibration of the curtain bot (Open left to right/
-        Open right to left/Open from the middle).
-        The parameter 'reverse_mode' reverse these values,
-        if 'reverse_mode' = True, position = 0 equals close
-        and position = 100 equals open. The parameter is default set to True so that
-        the definition of position is the same as in Home Assistant."""
-        self._reverse = kwargs.pop("reverse_mode", True)
-        self._pos = 0
-        self._light_level = 0
-        self._is_calibrated = 0
+        """Switchbot Curtain/WoCurtain constructor."""
+
+        # The position of the curtain is saved returned with 0 = open and 100 = closed.
+        # This is independent of the calibration of the curtain bot (Open left to right/
+        # Open right to left/Open from the middle).
+        # The parameter 'reverse_mode' reverse these values,
+        # if 'reverse_mode' = True, position = 0 equals close
+        # and position = 100 equals open. The parameter is default set to True so that
+        # the definition of position is the same as in Home Assistant.
+
         super().__init__(*args, **kwargs)
+        self._reverse = kwargs.pop("reverse_mode", True)
 
     def open(self) -> bool:
         """Send open command."""
-        self._pos = 100 if self._reverse else 0
         return self._sendcommand(OPEN_KEY, self._retry_count)
 
     def close(self) -> bool:
         """Send close command."""
-        self._pos = 0 if self._reverse else 100
         return self._sendcommand(CLOSE_KEY, self._retry_count)
 
     def stop(self) -> bool:
@@ -265,33 +446,34 @@ class SwitchbotCurtain(SwitchbotDevice):
     def set_position(self, position: int) -> bool:
         """Send position command (0-100) to device."""
         position = (100 - position) if self._reverse else position
-        self._pos = position
         hex_position = "%0.2X" % position
         return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
 
-    def update(self, scan_timeout=5) -> None:
-        """Updates the current position, battery percent and light level of the device."""
-        barray = self.get_servicedata(scan_timeout=scan_timeout)
-
-        if barray is None:
-            return
-
-        self._is_calibrated = barray[1] & 0b01000000
-        self._battery_percent = barray[2] & 0b01111111
-        position = max(min(barray[3] & 0b01111111, 100), 0)
-        self._pos = (100 - position) if self._reverse else position
-        self._light_level = (barray[4] >> 4) & 0b00001111  # light sensor level (1-10)
+    def update(self, interface=None) -> None:
+        """Update position, battery percent and light level of device."""
+        self.get_device_data(retry=self._retry_count, interface=interface)
 
     def get_position(self) -> int:
-        """Returns the current cached position (0-100), the actual position could vary.
-        To get the actual position call update() first."""
-        return self._pos
+        """Return cached position (0-100) of Curtain."""
+        # To get actual position call update() first.
+        if not self._switchbot_device_data:
+            return None
+        return self._switchbot_device_data["data"]["position"]
 
     def get_light_level(self) -> int:
-        """Returns the current cached light level, the actual light level could vary.
-        To get the actual light level call update() first."""
-        return self._light_level
+        """Return cached light level."""
+        # To get actual light level call update() first.
+        if not self._switchbot_device_data:
+            return None
+        return self._switchbot_device_data["data"]["lightLevel"]
 
     def is_reversed(self) -> bool:
-        """Returns True if the curtain open from left to right."""
+        """Return True if the curtain open from left to right."""
         return self._reverse
+
+    def is_calibrated(self) -> bool:
+        """Return True curtain is calibrated."""
+        # To get actual light level call update() first.
+        if not self._switchbot_device_data:
+            return None
+        return self._switchbot_device_data["data"]["calibration"]