|
@@ -3,7 +3,7 @@ from __future__ import annotations
|
|
|
|
|
|
import binascii
|
|
import binascii
|
|
import logging
|
|
import logging
|
|
-import threading
|
|
+from threading import Lock
|
|
import time
|
|
import time
|
|
from typing import Any
|
|
from typing import Any
|
|
|
|
|
|
@@ -13,239 +13,246 @@ DEFAULT_RETRY_COUNT = 3
|
|
DEFAULT_RETRY_TIMEOUT = 1
|
|
DEFAULT_RETRY_TIMEOUT = 1
|
|
DEFAULT_SCAN_TIMEOUT = 5
|
|
DEFAULT_SCAN_TIMEOUT = 5
|
|
|
|
|
|
-UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
|
|
+
|
|
-HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
|
|
+DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
|
|
-NOTIFICATION_HANDLE = "cba20003-224d-11e6-9fb8-0002a5d5c51b"
|
|
+DEVICE_SET_MODE_KEY = "5703"
|
|
-
|
|
+DEVICE_SET_EXTENDED_KEY = "570f"
|
|
-KEY_PASSWORD_PREFIX = "5711"
|
|
|
|
-KEY_PASSWORD_NOTIFY_PREFIX = "5712"
|
|
|
|
|
|
|
|
|
|
+
|
|
PRESS_KEY = "570100"
|
|
PRESS_KEY = "570100"
|
|
ON_KEY = "570101"
|
|
ON_KEY = "570101"
|
|
OFF_KEY = "570102"
|
|
OFF_KEY = "570102"
|
|
|
|
+DOWN_KEY = "570103"
|
|
|
|
+UP_KEY = "570104"
|
|
|
|
|
|
|
|
+
|
|
OPEN_KEY = "570f450105ff00"
|
|
OPEN_KEY = "570f450105ff00"
|
|
CLOSE_KEY = "570f450105ff64"
|
|
CLOSE_KEY = "570f450105ff64"
|
|
POSITION_KEY = "570F450105ff"
|
|
POSITION_KEY = "570F450105ff"
|
|
STOP_KEY = "570F450100ff"
|
|
STOP_KEY = "570F450100ff"
|
|
-DEVICE_BASIC_SETTINGS_KEY = "5702"
|
|
+CURTAIN_EXT_SUM_KEY = "570f460401"
|
|
|
|
+CURTAIN_EXT_ADV_KEY = "570f460402"
|
|
|
|
+CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
|
|
|
|
|
|
-ON_KEY_SUFFIX = "01"
|
|
+
|
|
-OFF_KEY_SUFFIX = "02"
|
|
+KEY_PASSWORD_PREFIX = "571"
|
|
-PRESS_KEY_SUFFIX = "00"
|
|
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
_LOGGER = logging.getLogger(__name__)
|
|
-CONNECT_LOCK = threading.Lock()
|
|
+CONNECT_LOCK = Lock()
|
|
|
|
|
|
|
|
|
|
-def _process_wohand(data: bytes) -> dict[str, bool | int]:
|
|
+def _sb_uuid(comms_type: str = "service") -> bluepy.btle.UUID:
|
|
- """Process woHand/Bot services data."""
|
|
+ """Return Switchbot UUID."""
|
|
- _bot_data: dict[str, bool | int] = {}
|
|
|
|
|
|
|
|
-
|
|
+ _uuid = {"tx": "002", "rx": "003", "service": "d00"}
|
|
- _bot_data["switchMode"] = bool(data[1] & 0b10000000)
|
|
|
|
|
|
|
|
-
|
|
+ if comms_type in _uuid:
|
|
- if _bot_data["switchMode"]:
|
|
+ return bluepy.btle.UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
|
|
- _bot_data["isOn"] = not bool(data[1] & 0b01000000)
|
|
+
|
|
|
|
+ return "Incorrect type, choose between: tx, rx or service"
|
|
|
|
|
|
- else:
|
|
|
|
- _bot_data["isOn"] = False
|
|
|
|
|
|
|
|
- _bot_data["battery"] = data[2] & 0b01111111
|
|
+def _process_wohand(data: bytes) -> dict[str, bool | int]:
|
|
|
|
+ """Process woHand/Bot services data."""
|
|
|
|
+ _switch_mode = bool(data[1] & 0b10000000)
|
|
|
|
+
|
|
|
|
+ _bot_data = {
|
|
|
|
+ "switchMode": _switch_mode,
|
|
|
|
+ "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
|
|
|
|
+ "battery": data[2] & 0b01111111,
|
|
|
|
+ }
|
|
|
|
|
|
return _bot_data
|
|
return _bot_data
|
|
|
|
|
|
|
|
|
|
def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
|
|
def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
|
|
"""Process woCurtain/Curtain services data."""
|
|
"""Process woCurtain/Curtain services data."""
|
|
- _curtain_data: dict[str, bool | int] = {}
|
|
|
|
|
|
|
|
- _curtain_data["calibration"] = bool(data[1] & 0b01000000)
|
|
|
|
- _curtain_data["battery"] = data[2] & 0b01111111
|
|
|
|
- _curtain_data["inMotion"] = bool(data[3] & 0b10000000)
|
|
|
|
_position = max(min(data[3] & 0b01111111, 100), 0)
|
|
_position = max(min(data[3] & 0b01111111, 100), 0)
|
|
- _curtain_data["position"] = (100 - _position) if reverse else _position
|
|
|
|
|
|
|
|
-
|
|
+ _curtain_data = {
|
|
- _curtain_data["lightLevel"] = (data[4] >> 4) & 0b00001111
|
|
+ "calibration": bool(data[1] & 0b01000000),
|
|
- _curtain_data["deviceChain"] = data[4] & 0b00000111
|
|
+ "battery": data[2] & 0b01111111,
|
|
|
|
+ "inMotion": bool(data[3] & 0b10000000),
|
|
|
|
+ "position": (100 - _position) if reverse else _position,
|
|
|
|
+ "lightLevel": (data[4] >> 4) & 0b00001111,
|
|
|
|
+ "deviceChain": data[4] & 0b00000111,
|
|
|
|
+ }
|
|
|
|
|
|
return _curtain_data
|
|
return _curtain_data
|
|
|
|
|
|
|
|
|
|
-def _process_wosensorth(data: bytes) -> dict[str, Any]:
|
|
+def _process_wosensorth(data: bytes) -> dict[str, object]:
|
|
"""Process woSensorTH/Temp sensor services data."""
|
|
"""Process woSensorTH/Temp sensor services data."""
|
|
- _wosensorth_data: dict[str, Any] = {}
|
|
|
|
|
|
|
|
_temp_sign = 1 if data[4] & 0b10000000 else -1
|
|
_temp_sign = 1 if data[4] & 0b10000000 else -1
|
|
_temp_c = _temp_sign * ((data[4] & 0b01111111) + (data[3] / 10))
|
|
_temp_c = _temp_sign * ((data[4] & 0b01111111) + (data[3] / 10))
|
|
_temp_f = (_temp_c * 9 / 5) + 32
|
|
_temp_f = (_temp_c * 9 / 5) + 32
|
|
_temp_f = (_temp_f * 10) / 10
|
|
_temp_f = (_temp_f * 10) / 10
|
|
|
|
|
|
- _wosensorth_data["temp"] = {}
|
|
+ _wosensorth_data = {
|
|
- _wosensorth_data["temp"]["c"] = _temp_c
|
|
+ "temp": {"c": _temp_c, "f": _temp_f},
|
|
- _wosensorth_data["temp"]["f"] = _temp_f
|
|
+ "fahrenheit": bool(data[5] & 0b10000000),
|
|
-
|
|
+ "humidity": data[5] & 0b01111111,
|
|
- _wosensorth_data["fahrenheit"] = bool(data[5] & 0b10000000)
|
|
+ "battery": data[2] & 0b01111111,
|
|
- _wosensorth_data["humidity"] = data[5] & 0b01111111
|
|
+ }
|
|
- _wosensorth_data["battery"] = data[2] & 0b01111111
|
|
|
|
|
|
|
|
return _wosensorth_data
|
|
return _wosensorth_data
|
|
|
|
|
|
|
|
|
|
|
|
+def _process_btle_adv_data(dev: bluepy.btle.ScanEntry) -> dict[str, Any]:
|
|
|
|
+ """Process bt le adv data."""
|
|
|
|
+ _adv_data = {"mac_address": dev.addr}
|
|
|
|
+ _data = dev.getValue(22)[2:]
|
|
|
|
+
|
|
|
|
+ supported_types: dict[str, dict[str, Any]] = {
|
|
|
|
+ "H": {"modelName": "WoHand", "func": _process_wohand},
|
|
|
|
+ "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
|
|
|
|
+ "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ _model = chr(_data[0] & 0b01111111)
|
|
|
|
+ _adv_data["isEncrypted"] = bool(_data[0] & 0b10000000)
|
|
|
|
+ _adv_data["model"] = _model
|
|
|
|
+ if _model in supported_types:
|
|
|
|
+ _adv_data["data"] = supported_types[_model]["func"](_data)
|
|
|
|
+ _adv_data["data"]["rssi"] = dev.rssi
|
|
|
|
+ _adv_data["modelName"] = supported_types[_model]["modelName"]
|
|
|
|
+ else:
|
|
|
|
+ _adv_data["rawAdvData"] = dev.getValueText(22)
|
|
|
|
+
|
|
|
|
+ return _adv_data
|
|
|
|
+
|
|
|
|
+
|
|
class GetSwitchbotDevices:
|
|
class GetSwitchbotDevices:
|
|
"""Scan for all Switchbot devices and return by type."""
|
|
"""Scan for all Switchbot devices and return by type."""
|
|
|
|
|
|
- def __init__(self, interface: int | None = None) -> None:
|
|
+ def __init__(self, interface: int = 0) -> None:
|
|
"""Get switchbot devices class constructor."""
|
|
"""Get switchbot devices class constructor."""
|
|
self._interface = interface
|
|
self._interface = interface
|
|
- self._all_services_data: dict[str, Any] = {}
|
|
+ self._adv_data: dict[str, Any] = {}
|
|
|
|
|
|
def discover(
|
|
def discover(
|
|
- self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
|
|
+ self,
|
|
- ) -> dict | None:
|
|
+ retry: int = DEFAULT_RETRY_COUNT,
|
|
|
|
+ scan_timeout: int = DEFAULT_SCAN_TIMEOUT,
|
|
|
|
+ passive: bool = False,
|
|
|
|
+ mac: str | None = None,
|
|
|
|
+ ) -> dict[str, Any]:
|
|
"""Find switchbot devices and their advertisement data."""
|
|
"""Find switchbot devices and their advertisement data."""
|
|
-
|
|
|
|
devices = None
|
|
devices = None
|
|
|
|
|
|
- try:
|
|
+ with CONNECT_LOCK:
|
|
- devices = bluepy.btle.Scanner(self._interface).scan(scan_timeout)
|
|
+ try:
|
|
-
|
|
+ devices = bluepy.btle.Scanner(self._interface).scan(
|
|
- except bluepy.btle.BTLEManagementError:
|
|
+ scan_timeout, passive
|
|
- _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
|
|
+ )
|
|
|
|
+ except bluepy.btle.BTLEManagementError:
|
|
|
|
+ _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
|
|
|
|
|
|
if devices is None:
|
|
if devices is None:
|
|
if retry < 1:
|
|
if retry < 1:
|
|
_LOGGER.error(
|
|
_LOGGER.error(
|
|
"Scanning for Switchbot devices failed. Stop trying", exc_info=True
|
|
"Scanning for Switchbot devices failed. Stop trying", exc_info=True
|
|
)
|
|
)
|
|
- return None
|
|
+ return self._adv_data
|
|
|
|
|
|
_LOGGER.warning(
|
|
_LOGGER.warning(
|
|
"Error scanning for Switchbot devices. Retrying (remaining: %d)",
|
|
"Error scanning for Switchbot devices. Retrying (remaining: %d)",
|
|
retry,
|
|
retry,
|
|
)
|
|
)
|
|
time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
- return self.discover(retry - 1, scan_timeout)
|
|
+ return self.discover(
|
|
|
|
+ retry=retry - 1,
|
|
|
|
+ scan_timeout=scan_timeout,
|
|
|
|
+ passive=passive,
|
|
|
|
+ mac=mac,
|
|
|
|
+ )
|
|
|
|
|
|
for dev in devices:
|
|
for dev in devices:
|
|
- if dev.getValueText(7) == UUID:
|
|
+ if dev.getValueText(7) == str(_sb_uuid()):
|
|
dev_id = dev.addr.replace(":", "")
|
|
dev_id = dev.addr.replace(":", "")
|
|
- self._all_services_data[dev_id] = {}
|
|
+ if mac:
|
|
- self._all_services_data[dev_id]["mac_address"] = dev.addr
|
|
+ if dev.addr.lower() == mac.lower():
|
|
- for (adtype, desc, value) in dev.getScanData():
|
|
+ self._adv_data[dev_id] = _process_btle_adv_data(dev)
|
|
- if adtype == 22:
|
|
+ else:
|
|
- _data = bytes.fromhex(value[4:])
|
|
+ self._adv_data[dev_id] = _process_btle_adv_data(dev)
|
|
- _model = chr(_data[0] & 0b01111111)
|
|
+
|
|
- if _model == "H":
|
|
+ return self._adv_data
|
|
- self._all_services_data[dev_id]["data"] = _process_wohand(
|
|
|
|
- _data
|
|
|
|
- )
|
|
|
|
- self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
|
|
|
|
- self._all_services_data[dev_id]["isEncrypted"] = bool(
|
|
|
|
- _data[0] & 0b10000000
|
|
|
|
- )
|
|
|
|
- self._all_services_data[dev_id]["model"] = _model
|
|
|
|
- self._all_services_data[dev_id]["modelName"] = "WoHand"
|
|
|
|
- elif _model == "c":
|
|
|
|
- self._all_services_data[dev_id][
|
|
|
|
- "data"
|
|
|
|
- ] = _process_wocurtain(_data)
|
|
|
|
- self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
|
|
|
|
- self._all_services_data[dev_id]["isEncrypted"] = bool(
|
|
|
|
- _data[0] & 0b10000000
|
|
|
|
- )
|
|
|
|
- self._all_services_data[dev_id]["model"] = _model
|
|
|
|
- self._all_services_data[dev_id]["modelName"] = "WoCurtain"
|
|
|
|
- elif _model == "T":
|
|
|
|
- self._all_services_data[dev_id][
|
|
|
|
- "data"
|
|
|
|
- ] = _process_wosensorth(_data)
|
|
|
|
- self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
|
|
|
|
- self._all_services_data[dev_id]["isEncrypted"] = bool(
|
|
|
|
- _data[0] & 0b10000000
|
|
|
|
- )
|
|
|
|
- self._all_services_data[dev_id]["model"] = _model
|
|
|
|
- self._all_services_data[dev_id]["modelName"] = "WoSensorTH"
|
|
|
|
-
|
|
|
|
- else:
|
|
|
|
- continue
|
|
|
|
- else:
|
|
|
|
- self._all_services_data[dev_id][desc] = value
|
|
|
|
-
|
|
|
|
- return self._all_services_data
|
|
|
|
|
|
|
|
def get_curtains(self) -> dict:
|
|
def get_curtains(self) -> dict:
|
|
"""Return all WoCurtain/Curtains devices with services data."""
|
|
"""Return all WoCurtain/Curtains devices with services data."""
|
|
- if not self._all_services_data:
|
|
+ if not self._adv_data:
|
|
self.discover()
|
|
self.discover()
|
|
|
|
|
|
- _curtain_devices = {}
|
|
+ _curtain_devices = {
|
|
-
|
|
+ device: data
|
|
- for device, data in self._all_services_data.items():
|
|
+ for device, data in self._adv_data.items()
|
|
- if data.get("model") == "c":
|
|
+ if data.get("model") == "c"
|
|
- _curtain_devices[device] = data
|
|
+ }
|
|
|
|
|
|
return _curtain_devices
|
|
return _curtain_devices
|
|
|
|
|
|
def get_bots(self) -> dict:
|
|
def get_bots(self) -> dict:
|
|
"""Return all WoHand/Bot devices with services data."""
|
|
"""Return all WoHand/Bot devices with services data."""
|
|
- if not self._all_services_data:
|
|
+ if not self._adv_data:
|
|
self.discover()
|
|
self.discover()
|
|
|
|
|
|
- _bot_devices = {}
|
|
+ _bot_devices = {
|
|
-
|
|
+ device: data
|
|
- for device, data in self._all_services_data.items():
|
|
+ for device, data in self._adv_data.items()
|
|
- if data.get("model") == "H":
|
|
+ if data.get("model") == "H"
|
|
- _bot_devices[device] = data
|
|
+ }
|
|
|
|
|
|
return _bot_devices
|
|
return _bot_devices
|
|
|
|
|
|
def get_tempsensors(self) -> dict:
|
|
def get_tempsensors(self) -> dict:
|
|
"""Return all WoSensorTH/Temp sensor devices with services data."""
|
|
"""Return all WoSensorTH/Temp sensor devices with services data."""
|
|
- if not self._all_services_data:
|
|
+ if not self._adv_data:
|
|
self.discover()
|
|
self.discover()
|
|
|
|
|
|
- _bot_temp = {}
|
|
+ _bot_temp = {
|
|
-
|
|
+ device: data
|
|
- for device, data in self._all_services_data.items():
|
|
+ for device, data in self._adv_data.items()
|
|
- if data.get("model") == "T":
|
|
+ if data.get("model") == "T"
|
|
- _bot_temp[device] = data
|
|
+ }
|
|
|
|
|
|
return _bot_temp
|
|
return _bot_temp
|
|
|
|
|
|
def get_device_data(self, mac: str) -> dict:
|
|
def get_device_data(self, mac: str) -> dict:
|
|
"""Return data for specific device."""
|
|
"""Return data for specific device."""
|
|
- if not self._all_services_data:
|
|
+ if not self._adv_data:
|
|
self.discover()
|
|
self.discover()
|
|
|
|
|
|
- _switchbot_data = {}
|
|
+ _switchbot_data = {
|
|
-
|
|
+ device: data
|
|
- for device in self._all_services_data.values():
|
|
+ for device, data in self._adv_data.items()
|
|
- if device["mac_address"] == mac:
|
|
+ if data.get("mac_address") == mac
|
|
- _switchbot_data = device
|
|
+ }
|
|
|
|
|
|
return _switchbot_data
|
|
return _switchbot_data
|
|
|
|
|
|
|
|
|
|
-class SwitchbotDevice:
|
|
+class SwitchbotDevice(bluepy.btle.Peripheral):
|
|
"""Base Representation of a Switchbot Device."""
|
|
"""Base Representation of a Switchbot Device."""
|
|
|
|
|
|
def __init__(
|
|
def __init__(
|
|
self,
|
|
self,
|
|
mac: str,
|
|
mac: str,
|
|
password: str | None = None,
|
|
password: str | None = None,
|
|
- interface: int | None = None,
|
|
+ interface: int = 0,
|
|
**kwargs: Any,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
) -> None:
|
|
"""Switchbot base class constructor."""
|
|
"""Switchbot base class constructor."""
|
|
- self._interface = interface
|
|
+ bluepy.btle.Peripheral.__init__(
|
|
- self._mac = mac
|
|
+ self,
|
|
- self._device = bluepy.btle.Peripheral(
|
|
+ deviceAddr=None,
|
|
- deviceAddr=None, addrType=bluepy.btle.ADDR_TYPE_RANDOM, iface=interface
|
|
+ addrType=bluepy.btle.ADDR_TYPE_RANDOM,
|
|
|
|
+ iface=interface,
|
|
)
|
|
)
|
|
- self._switchbot_device_data: dict[str, Any] = {}
|
|
+ self._interface = interface
|
|
|
|
+ self._mac = mac.replace("-", ":").lower()
|
|
|
|
+ self._sb_adv_data: dict[str, Any] = {}
|
|
self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
|
|
self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
|
|
self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
|
|
self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
|
|
if password is None or password == "":
|
|
if password is None or password == "":
|
|
@@ -255,41 +262,84 @@ class SwitchbotDevice:
|
|
binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
|
|
binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
|
|
)
|
|
)
|
|
|
|
|
|
- def _connect(self) -> None:
|
|
+
|
|
- try:
|
|
+ def _connect(self, retry: int, timeout: int | None = None) -> None:
|
|
- _LOGGER.debug("Connecting to Switchbot")
|
|
+ _LOGGER.debug("Connecting to Switchbot")
|
|
- self._device.connect(
|
|
+
|
|
- self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
|
|
+ if retry < 1:
|
|
|
|
+ self._stopHelper()
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ if self._helper is None:
|
|
|
|
+ self._startHelper(self._interface)
|
|
|
|
+
|
|
|
|
+ self._writeCmd("conn %s %s\n" % (self._mac, bluepy.btle.ADDR_TYPE_RANDOM))
|
|
|
|
+
|
|
|
|
+ rsp = self._getResp(["stat", "err"], timeout)
|
|
|
|
+
|
|
|
|
+ while rsp.get("state") and rsp["state"][0] in [
|
|
|
|
+ "tryconn",
|
|
|
|
+ "scan",
|
|
|
|
+ "disc",
|
|
|
|
+ ]:
|
|
|
|
+ rsp = self._getResp(["stat", "err"], timeout)
|
|
|
|
+
|
|
|
|
+ if rsp and rsp["rsp"][0] == "err":
|
|
|
|
+ errcode = rsp["code"][0]
|
|
|
|
+ _LOGGER.debug(
|
|
|
|
+ "Error trying to connect to peripheral %s, error: %s",
|
|
|
|
+ self._mac,
|
|
|
|
+ errcode,
|
|
)
|
|
)
|
|
- _LOGGER.debug("Connected to Switchbot")
|
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
|
- _LOGGER.debug("Failed connecting to Switchbot", exc_info=True)
|
|
|
|
- raise
|
|
|
|
|
|
|
|
- def _disconnect(self) -> None:
|
|
+ if errcode == "connfail":
|
|
- _LOGGER.debug("Disconnecting")
|
|
+ return self._connect(retry - 1, timeout)
|
|
- try:
|
|
+
|
|
- self._device.disconnect()
|
|
+ if errcode == "nomgmt":
|
|
- except bluepy.btle.BTLEException:
|
|
+ raise bluepy.btle.BTLEManagementError(
|
|
- _LOGGER.warning("Error disconnecting from Switchbot", exc_info=True)
|
|
+ "Management not available (permissions problem?)", rsp
|
|
|
|
+ )
|
|
|
|
+ if errcode == "atterr":
|
|
|
|
+ raise bluepy.btle.BTLEGattError("Bluetooth command failed", rsp)
|
|
|
|
+
|
|
|
|
+ raise bluepy.btle.BTLEException(
|
|
|
|
+ "Error from bluepy-helper (%s)" % errcode, rsp
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if not rsp or rsp["state"][0] != "conn":
|
|
|
|
+ _LOGGER.warning("Bluehelper returned unable to connect state: %s", rsp)
|
|
|
|
+ self._stopHelper()
|
|
|
|
+
|
|
|
|
+ if rsp is None:
|
|
|
|
+ raise bluepy.btle.BTLEDisconnectError(
|
|
|
|
+ "Timed out while trying to connect to peripheral %s" % self._mac,
|
|
|
|
+ rsp,
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ raise bluepy.btle.BTLEDisconnectError(
|
|
|
|
+ "Failed to connect to peripheral %s, rsp: %s" % (self._mac, rsp)
|
|
|
|
+ )
|
|
|
|
|
|
def _commandkey(self, key: str) -> str:
|
|
def _commandkey(self, key: str) -> str:
|
|
if self._password_encoded is None:
|
|
if self._password_encoded is None:
|
|
return key
|
|
return key
|
|
- key_suffix = PRESS_KEY_SUFFIX
|
|
+ key_action = key[3]
|
|
- if key == ON_KEY:
|
|
+ key_suffix = key[4:]
|
|
- key_suffix = ON_KEY_SUFFIX
|
|
+ return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
|
|
- elif key == OFF_KEY:
|
|
|
|
- key_suffix = OFF_KEY_SUFFIX
|
|
|
|
- elif key == DEVICE_BASIC_SETTINGS_KEY:
|
|
|
|
- return KEY_PASSWORD_NOTIFY_PREFIX + self._password_encoded
|
|
|
|
- return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
|
|
|
|
|
|
|
|
def _writekey(self, key: str) -> Any:
|
|
def _writekey(self, key: str) -> Any:
|
|
|
|
+ if self._helper is None:
|
|
|
|
+ return
|
|
_LOGGER.debug("Prepare to send")
|
|
_LOGGER.debug("Prepare to send")
|
|
- hand = self._device.getCharacteristics(uuid=HANDLE)[0]
|
|
+ try:
|
|
- _LOGGER.debug("Sending command, %s", key)
|
|
+ hand = self.getCharacteristics(uuid=_sb_uuid("tx"))[0]
|
|
- write_result = hand.write(binascii.a2b_hex(key), withResponse=False)
|
|
+ _LOGGER.debug("Sending command, %s", key)
|
|
|
|
+ write_result = hand.write(bytes.fromhex(key), withResponse=False)
|
|
|
|
+ except bluepy.btle.BTLEException:
|
|
|
|
+ _LOGGER.warning(
|
|
|
|
+ "Error while enabling notifications on Switchbot", exc_info=True
|
|
|
|
+ )
|
|
|
|
+ raise
|
|
if not write_result:
|
|
if not write_result:
|
|
_LOGGER.error(
|
|
_LOGGER.error(
|
|
"Sent command but didn't get a response from Switchbot confirming command was sent."
|
|
"Sent command but didn't get a response from Switchbot confirming command was sent."
|
|
@@ -299,43 +349,71 @@ class SwitchbotDevice:
|
|
_LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
|
|
_LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
|
|
return write_result
|
|
return write_result
|
|
|
|
|
|
- def _subscribe(self, key: str) -> Any:
|
|
+ def _subscribe(self) -> None:
|
|
|
|
+ if self._helper is None:
|
|
|
|
+ return
|
|
_LOGGER.debug("Subscribe to notifications")
|
|
_LOGGER.debug("Subscribe to notifications")
|
|
- handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)[0]
|
|
+ enable_notify_flag = b"\x01\x00"
|
|
- notify_handle = handle.getHandle() + 1
|
|
+ try:
|
|
- response = self._device.writeCharacteristic(
|
|
+ handle = self.getCharacteristics(uuid=_sb_uuid("rx"))[0]
|
|
- notify_handle, binascii.a2b_hex(key), withResponse=False
|
|
+ notify_handle = handle.getHandle() + 1
|
|
- )
|
|
+ self.writeCharacteristic(
|
|
- return response
|
|
+ notify_handle, enable_notify_flag, withResponse=False
|
|
|
|
+ )
|
|
|
|
+ except bluepy.btle.BTLEException:
|
|
|
|
+ _LOGGER.warning(
|
|
|
|
+ "Error while enabling notifications on Switchbot", exc_info=True
|
|
|
|
+ )
|
|
|
|
+ raise
|
|
|
|
|
|
- def _readkey(self) -> bytes | None:
|
|
+ def _readkey(self) -> bytes:
|
|
|
|
+ if self._helper is None:
|
|
|
|
+ return b''
|
|
_LOGGER.debug("Prepare to read")
|
|
_LOGGER.debug("Prepare to read")
|
|
- receive_handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)
|
|
+ try:
|
|
- if receive_handle:
|
|
+ receive_handle = self.getCharacteristics(uuid=_sb_uuid("rx"))
|
|
- for char in receive_handle:
|
|
+ if receive_handle:
|
|
- read_result: bytes = char.read()
|
|
+ for char in receive_handle:
|
|
- return read_result
|
|
+ read_result: bytes = char.read()
|
|
- return None
|
|
+ return read_result
|
|
-
|
|
+ return b"\x00"
|
|
- def _sendcommand(self, key: str, retry: int) -> bool:
|
|
+ except bluepy.btle.BTLEException:
|
|
|
|
+ _LOGGER.warning(
|
|
|
|
+ "Error while reading notifications from Switchbot", exc_info=True
|
|
|
|
+ )
|
|
|
|
+ raise
|
|
|
|
+
|
|
|
|
+ def _sendcommand(self, key: str, retry: int, timeout: int | None = None) -> bytes:
|
|
send_success = False
|
|
send_success = False
|
|
command = self._commandkey(key)
|
|
command = self._commandkey(key)
|
|
|
|
+ notify_msg = b"\x00"
|
|
_LOGGER.debug("Sending command to switchbot %s", command)
|
|
_LOGGER.debug("Sending command to switchbot %s", command)
|
|
|
|
+
|
|
|
|
+ if len(self._mac.split(":")) != 6:
|
|
|
|
+ raise ValueError("Expected MAC address, got %s" % repr(self._mac))
|
|
|
|
+
|
|
with CONNECT_LOCK:
|
|
with CONNECT_LOCK:
|
|
try:
|
|
try:
|
|
- self._connect()
|
|
+ self._connect(retry, timeout)
|
|
|
|
+ self._subscribe()
|
|
send_success = self._writekey(command)
|
|
send_success = self._writekey(command)
|
|
|
|
+ notify_msg = self._readkey()
|
|
except bluepy.btle.BTLEException:
|
|
except bluepy.btle.BTLEException:
|
|
_LOGGER.warning("Error talking to Switchbot", exc_info=True)
|
|
_LOGGER.warning("Error talking to Switchbot", exc_info=True)
|
|
finally:
|
|
finally:
|
|
- self._disconnect()
|
|
+ self.disconnect()
|
|
if send_success:
|
|
if send_success:
|
|
- return True
|
|
+ if notify_msg == b"\x07":
|
|
|
|
+ _LOGGER.error("Password required")
|
|
|
|
+ elif notify_msg == b"\t":
|
|
|
|
+ _LOGGER.error("Password incorrect")
|
|
|
|
+
|
|
|
|
+ return notify_msg
|
|
if retry < 1:
|
|
if retry < 1:
|
|
_LOGGER.error(
|
|
_LOGGER.error(
|
|
"Switchbot communication failed. Stopping trying", exc_info=True
|
|
"Switchbot communication failed. Stopping trying", exc_info=True
|
|
)
|
|
)
|
|
- return False
|
|
+ return notify_msg
|
|
_LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
|
|
_LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
|
|
time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
return self._sendcommand(key, retry - 1)
|
|
return self._sendcommand(key, retry - 1)
|
|
@@ -346,112 +424,32 @@ class SwitchbotDevice:
|
|
|
|
|
|
def get_battery_percent(self) -> Any:
|
|
def get_battery_percent(self) -> Any:
|
|
"""Return device battery level in percent."""
|
|
"""Return device battery level in percent."""
|
|
- if not self._switchbot_device_data:
|
|
+ if not self._sb_adv_data.get("data"):
|
|
return None
|
|
return None
|
|
- return self._switchbot_device_data["data"]["battery"]
|
|
+ return self._sb_adv_data["data"].get("battery")
|
|
|
|
|
|
def get_device_data(
|
|
def get_device_data(
|
|
- self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
|
|
+ self,
|
|
- ) -> dict | None:
|
|
+ retry: int = DEFAULT_RETRY_COUNT,
|
|
|
|
+ interface: int | None = None,
|
|
|
|
+ passive: bool = False,
|
|
|
|
+ ) -> dict[str, Any]:
|
|
"""Find switchbot devices and their advertisement data."""
|
|
"""Find switchbot devices and their advertisement data."""
|
|
- if interface:
|
|
+ _interface: int = interface if interface else self._interface
|
|
- _interface: int | None = interface
|
|
|
|
- else:
|
|
|
|
- _interface = self._interface
|
|
|
|
-
|
|
|
|
- devices = None
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- devices = bluepy.btle.Scanner(_interface).scan(self._scan_timeout)
|
|
|
|
|
|
|
|
- except bluepy.btle.BTLEManagementError:
|
|
+ dev_id = self._mac.replace(":", "")
|
|
- _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
|
|
|
|
|
|
|
|
- if devices is None:
|
|
+ _data = GetSwitchbotDevices(interface=_interface).discover(
|
|
- if retry < 1:
|
|
+ retry=retry,
|
|
- _LOGGER.error(
|
|
+ scan_timeout=self._scan_timeout,
|
|
- "Scanning for Switchbot devices failed. Stop trying", exc_info=True
|
|
+ passive=passive,
|
|
- )
|
|
+ mac=self._mac,
|
|
- return None
|
|
+ )
|
|
-
|
|
|
|
- _LOGGER.warning(
|
|
|
|
- "Error scanning for Switchbot devices. Retrying (remaining: %d)",
|
|
|
|
- retry,
|
|
|
|
- )
|
|
|
|
- time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
|
- return self.get_device_data(retry=retry - 1, interface=_interface)
|
|
|
|
-
|
|
|
|
- for dev in devices:
|
|
|
|
- if self._mac.lower() == dev.addr.lower():
|
|
|
|
- self._switchbot_device_data["mac_address"] = dev.addr
|
|
|
|
- for (adtype, desc, value) in dev.getScanData():
|
|
|
|
- if adtype == 22:
|
|
|
|
- _data = bytes.fromhex(value[4:])
|
|
|
|
- _model = chr(_data[0] & 0b01111111)
|
|
|
|
- if _model == "H":
|
|
|
|
- self._switchbot_device_data["data"] = _process_wohand(_data)
|
|
|
|
- self._switchbot_device_data["data"]["rssi"] = dev.rssi
|
|
|
|
- self._switchbot_device_data["isEncrypted"] = bool(
|
|
|
|
- _data[0] & 0b10000000
|
|
|
|
- )
|
|
|
|
- self._switchbot_device_data["model"] = _model
|
|
|
|
- self._switchbot_device_data["modelName"] = "WoHand"
|
|
|
|
- elif _model == "c":
|
|
|
|
- self._switchbot_device_data["data"] = _process_wocurtain(
|
|
|
|
- _data
|
|
|
|
- )
|
|
|
|
- self._switchbot_device_data["data"]["rssi"] = dev.rssi
|
|
|
|
- self._switchbot_device_data["isEncrypted"] = bool(
|
|
|
|
- _data[0] & 0b10000000
|
|
|
|
- )
|
|
|
|
- self._switchbot_device_data["model"] = _model
|
|
|
|
- self._switchbot_device_data["modelName"] = "WoCurtain"
|
|
|
|
- elif _model == "T":
|
|
|
|
- self._switchbot_device_data["data"] = _process_wosensorth(
|
|
|
|
- _data
|
|
|
|
- )
|
|
|
|
- self._switchbot_device_data["data"]["rssi"] = dev.rssi
|
|
|
|
- self._switchbot_device_data["isEncrypted"] = bool(
|
|
|
|
- _data[0] & 0b10000000
|
|
|
|
- )
|
|
|
|
- self._switchbot_device_data["model"] = _model
|
|
|
|
- self._switchbot_device_data["modelName"] = "WoSensorTH"
|
|
|
|
-
|
|
|
|
- else:
|
|
|
|
- continue
|
|
|
|
- else:
|
|
|
|
- self._switchbot_device_data[desc] = value
|
|
|
|
-
|
|
|
|
- return self._switchbot_device_data
|
|
|
|
-
|
|
|
|
- def _get_basic_info(self, retry: int = DEFAULT_RETRY_COUNT) -> bytes:
|
|
|
|
- """Get device basic settings."""
|
|
|
|
- send_success = False
|
|
|
|
- command = self._commandkey(DEVICE_BASIC_SETTINGS_KEY)
|
|
|
|
- try:
|
|
|
|
- self._connect()
|
|
|
|
- self._subscribe(command)
|
|
|
|
- send_success = self._writekey(command)
|
|
|
|
- value = self._readkey()
|
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
|
- _LOGGER.warning("Error talking to Switchbot", exc_info=True)
|
|
|
|
- finally:
|
|
|
|
- self._disconnect()
|
|
|
|
-
|
|
|
|
- if send_success and value:
|
|
|
|
-
|
|
|
|
- print("Successfully retrieved data from device " + str(self._mac))
|
|
|
|
|
|
|
|
- return value
|
|
+ if _data.get(dev_id):
|
|
|
|
+ self._sb_adv_data = _data[dev_id]
|
|
|
|
|
|
- if retry < 1:
|
|
+ return self._sb_adv_data
|
|
- _LOGGER.error(
|
|
|
|
- "Switchbot communication failed. Stopping trying", exc_info=True
|
|
|
|
- )
|
|
|
|
- return bytes(0)
|
|
|
|
- _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
|
|
|
|
- time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
|
- return self._get_basic_info(retry - 1)
|
|
|
|
|
|
|
|
|
|
|
|
class Switchbot(SwitchbotDevice):
|
|
class Switchbot(SwitchbotDevice):
|
|
@@ -463,52 +461,141 @@ class Switchbot(SwitchbotDevice):
|
|
self._inverse: bool = kwargs.pop("inverse_mode", False)
|
|
self._inverse: bool = kwargs.pop("inverse_mode", False)
|
|
self._settings: dict[str, Any] = {}
|
|
self._settings: dict[str, Any] = {}
|
|
|
|
|
|
- def update(self, interface: int | None = None) -> None:
|
|
+ def update(self, interface: int | None = None, passive: bool = False) -> None:
|
|
"""Update mode, battery percent and state of device."""
|
|
"""Update mode, battery percent and state of device."""
|
|
- self.get_device_data(retry=self._retry_count, interface=interface)
|
|
+ self.get_device_data(
|
|
|
|
+ retry=self._retry_count, interface=interface, passive=passive
|
|
|
|
+ )
|
|
|
|
|
|
def turn_on(self) -> bool:
|
|
def turn_on(self) -> bool:
|
|
"""Turn device on."""
|
|
"""Turn device on."""
|
|
- return self._sendcommand(ON_KEY, self._retry_count)
|
|
+ result = self._sendcommand(ON_KEY, self._retry_count)
|
|
|
|
+
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ if result[0] == 5:
|
|
|
|
+ _LOGGER.debug("Bot is in press mode and doesn't have on state")
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
|
|
def turn_off(self) -> bool:
|
|
def turn_off(self) -> bool:
|
|
"""Turn device off."""
|
|
"""Turn device off."""
|
|
- return self._sendcommand(OFF_KEY, self._retry_count)
|
|
+ result = self._sendcommand(OFF_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ if result[0] == 5:
|
|
|
|
+ _LOGGER.debug("Bot is in press mode and doesn't have off state")
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def hand_up(self) -> bool:
|
|
|
|
+ """Raise device arm."""
|
|
|
|
+ result = self._sendcommand(UP_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ if result[0] == 5:
|
|
|
|
+ _LOGGER.debug("Bot is in press mode")
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def hand_down(self) -> bool:
|
|
|
|
+ """Lower device arm."""
|
|
|
|
+ result = self._sendcommand(DOWN_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ if result[0] == 5:
|
|
|
|
+ _LOGGER.debug("Bot is in press mode")
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
|
|
def press(self) -> bool:
|
|
def press(self) -> bool:
|
|
"""Press command to device."""
|
|
"""Press command to device."""
|
|
- return self._sendcommand(PRESS_KEY, self._retry_count)
|
|
+ result = self._sendcommand(PRESS_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ if result[0] == 5:
|
|
|
|
+ _LOGGER.debug("Bot is in switch mode")
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def set_switch_mode(
|
|
|
|
+ self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
|
|
|
|
+ ) -> bool:
|
|
|
|
+ """Change bot mode."""
|
|
|
|
+ mode_key = format(switch_mode, "b") + format(inverse, "b")
|
|
|
|
+ strength_key = f"{strength:0{2}x}"
|
|
|
|
+
|
|
|
|
+ result = self._sendcommand(
|
|
|
|
+ DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def set_long_press(self, duration: int = 0) -> bool:
|
|
|
|
+ """Set bot long press duration."""
|
|
|
|
+ duration_key = f"{duration:0{2}x}"
|
|
|
|
+
|
|
|
|
+ result = self._sendcommand(
|
|
|
|
+ DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
|
|
- def get_basic_info(self) -> dict[str, Any]:
|
|
+ def get_basic_info(self) -> dict[str, Any] | None:
|
|
"""Get device basic settings."""
|
|
"""Get device basic settings."""
|
|
- settings_data = self._get_basic_info()
|
|
+ _data = self._sendcommand(
|
|
- self._settings["battery"] = settings_data[1]
|
|
+ key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
|
|
- self._settings["firmware"] = settings_data[2] / 10.0
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if _data in (b"\x07", b"\x00"):
|
|
|
|
+ _LOGGER.error("Unsuccessfull, please try again")
|
|
|
|
+ return None
|
|
|
|
|
|
- self._settings["timers"] = settings_data[8]
|
|
+ self._settings = {
|
|
- self._settings["dualStateMode"] = bool(settings_data[9] & 16)
|
|
+ "battery": _data[1],
|
|
- self._settings["inverseDirection"] = bool(settings_data[9] & 1)
|
|
+ "firmware": _data[2] / 10.0,
|
|
- self._settings["holdSeconds"] = settings_data[10]
|
|
+ "strength": _data[3],
|
|
|
|
+ "timers": _data[8],
|
|
|
|
+ "switchMode": bool(_data[9] & 16),
|
|
|
|
+ "inverseDirection": bool(_data[9] & 1),
|
|
|
|
+ "holdSeconds": _data[10],
|
|
|
|
+ }
|
|
|
|
|
|
return self._settings
|
|
return self._settings
|
|
|
|
|
|
def switch_mode(self) -> Any:
|
|
def switch_mode(self) -> Any:
|
|
"""Return true or false from cache."""
|
|
"""Return true or false from cache."""
|
|
|
|
|
|
- if not self._switchbot_device_data:
|
|
+ if not self._sb_adv_data.get("data"):
|
|
return None
|
|
return None
|
|
- return self._switchbot_device_data["data"]["switchMode"]
|
|
+ return self._sb_adv_data.get("switchMode")
|
|
|
|
|
|
def is_on(self) -> Any:
|
|
def is_on(self) -> Any:
|
|
"""Return switch state from cache."""
|
|
"""Return switch state from cache."""
|
|
|
|
|
|
- if not self._switchbot_device_data:
|
|
+ if not self._sb_adv_data.get("data"):
|
|
return None
|
|
return None
|
|
|
|
|
|
if self._inverse:
|
|
if self._inverse:
|
|
- return not self._switchbot_device_data["data"]["isOn"]
|
|
+ return not self._sb_adv_data["data"].get("isOn")
|
|
|
|
|
|
- return self._switchbot_device_data["data"]["isOn"]
|
|
+ return self._sb_adv_data["data"].get("isOn")
|
|
|
|
|
|
|
|
|
|
class SwitchbotCurtain(SwitchbotDevice):
|
|
class SwitchbotCurtain(SwitchbotDevice):
|
|
@@ -528,77 +615,164 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
super().__init__(*args, **kwargs)
|
|
super().__init__(*args, **kwargs)
|
|
self._reverse: bool = kwargs.pop("reverse_mode", True)
|
|
self._reverse: bool = kwargs.pop("reverse_mode", True)
|
|
self._settings: dict[str, Any] = {}
|
|
self._settings: dict[str, Any] = {}
|
|
|
|
+ self.ext_info_sum: dict[str, Any] = {}
|
|
|
|
+ self.ext_info_adv: dict[str, Any] = {}
|
|
|
|
|
|
def open(self) -> bool:
|
|
def open(self) -> bool:
|
|
"""Send open command."""
|
|
"""Send open command."""
|
|
- return self._sendcommand(OPEN_KEY, self._retry_count)
|
|
+ result = self._sendcommand(OPEN_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
|
|
def close(self) -> bool:
|
|
def close(self) -> bool:
|
|
"""Send close command."""
|
|
"""Send close command."""
|
|
- return self._sendcommand(CLOSE_KEY, self._retry_count)
|
|
+ result = self._sendcommand(CLOSE_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
|
|
def stop(self) -> bool:
|
|
def stop(self) -> bool:
|
|
"""Send stop command to device."""
|
|
"""Send stop command to device."""
|
|
- return self._sendcommand(STOP_KEY, self._retry_count)
|
|
+ result = self._sendcommand(STOP_KEY, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
|
|
def set_position(self, position: int) -> bool:
|
|
def set_position(self, position: int) -> bool:
|
|
"""Send position command (0-100) to device."""
|
|
"""Send position command (0-100) to device."""
|
|
position = (100 - position) if self._reverse else position
|
|
position = (100 - position) if self._reverse else position
|
|
hex_position = "%0.2X" % position
|
|
hex_position = "%0.2X" % position
|
|
- return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
|
|
+ result = self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
|
|
|
|
+ if result[0] == 1:
|
|
|
|
+ return True
|
|
|
|
|
|
- def update(self, interface: int | None = None) -> None:
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def update(self, interface: int | None = None, passive: bool = False) -> None:
|
|
"""Update position, battery percent and light level of device."""
|
|
"""Update position, battery percent and light level of device."""
|
|
- self.get_device_data(retry=self._retry_count, interface=interface)
|
|
+ self.get_device_data(
|
|
|
|
+ retry=self._retry_count, interface=interface, passive=passive
|
|
|
|
+ )
|
|
|
|
|
|
def get_position(self) -> Any:
|
|
def get_position(self) -> Any:
|
|
"""Return cached position (0-100) of Curtain."""
|
|
"""Return cached position (0-100) of Curtain."""
|
|
|
|
|
|
- if not self._switchbot_device_data:
|
|
+ if not self._sb_adv_data.get("data"):
|
|
return None
|
|
return None
|
|
- return self._switchbot_device_data["data"]["position"]
|
|
+ return self._sb_adv_data["data"].get("position")
|
|
|
|
|
|
- def get_basic_info(self) -> dict[str, Any]:
|
|
+ def get_basic_info(self) -> dict[str, Any] | None:
|
|
"""Get device basic settings."""
|
|
"""Get device basic settings."""
|
|
- settings_data = self._get_basic_info()
|
|
+ _data = self._sendcommand(
|
|
- self._settings["battery"] = settings_data[1]
|
|
+ key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
|
|
- self._settings["firmware"] = settings_data[2] / 10.0
|
|
+ )
|
|
|
|
|
|
- self._settings["chainLength"] = settings_data[3]
|
|
+ if _data in (b"\x07", b"\x00"):
|
|
|
|
+ _LOGGER.error("Unsuccessfull, please try again")
|
|
|
|
+ return None
|
|
|
|
|
|
- self._settings["openDirection"] = (
|
|
+ _position = max(min(_data[6], 100), 0)
|
|
- "right_to_left" if settings_data[4] & 0b10000000 == 128 else "left_to_right"
|
|
+
|
|
- )
|
|
+ self._settings = {
|
|
|
|
+ "battery": _data[1],
|
|
|
|
+ "firmware": _data[2] / 10.0,
|
|
|
|
+ "chainLength": _data[3],
|
|
|
|
+ "openDirection": (
|
|
|
|
+ "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
|
|
|
|
+ ),
|
|
|
|
+ "touchToOpen": bool(_data[4] & 0b01000000),
|
|
|
|
+ "light": bool(_data[4] & 0b00100000),
|
|
|
|
+ "fault": bool(_data[4] & 0b00001000),
|
|
|
|
+ "solarPanel": bool(_data[5] & 0b00001000),
|
|
|
|
+ "calibrated": bool(_data[5] & 0b00000100),
|
|
|
|
+ "inMotion": bool(_data[5] & 0b01000011),
|
|
|
|
+ "position": (100 - _position) if self._reverse else _position,
|
|
|
|
+ "timers": _data[7],
|
|
|
|
+ }
|
|
|
|
|
|
- self._settings["touchToOpen"] = bool(settings_data[4] & 0b01000000)
|
|
+ return self._settings
|
|
- self._settings["light"] = bool(settings_data[4] & 0b00100000)
|
|
|
|
- self._settings["fault"] = bool(settings_data[4] & 0b00001000)
|
|
|
|
|
|
|
|
- self._settings["solarPanel"] = bool(settings_data[5] & 0b00001000)
|
|
+ def get_extended_info_summary(self) -> dict[str, Any] | None:
|
|
- self._settings["calibrated"] = bool(settings_data[5] & 0b00000100)
|
|
+ """Get basic info for all devices in chain."""
|
|
- self._settings["inMotion"] = bool(settings_data[5] & 0b01000011)
|
|
+ _data = self._sendcommand(key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count)
|
|
|
|
|
|
- _position = max(min(settings_data[6], 100), 0)
|
|
+ if _data in (b"\x07", b"\x00"):
|
|
- self._settings["position"] = (100 - _position) if self._reverse else _position
|
|
+ _LOGGER.error("Unsuccessfull, please try again")
|
|
|
|
+ return None
|
|
|
|
|
|
- self._settings["timers"] = settings_data[7]
|
|
+ self.ext_info_sum["device0"] = {
|
|
|
|
+ "openDirectionDefault": not bool(_data[1] & 0b10000000),
|
|
|
|
+ "touchToOpen": bool(_data[1] & 0b01000000),
|
|
|
|
+ "light": bool(_data[1] & 0b00100000),
|
|
|
|
+ "openDirection": (
|
|
|
|
+ "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
|
|
|
|
+ ),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ if _data[2] != 0:
|
|
|
|
+ self.ext_info_sum["device1"] = {
|
|
|
|
+ "openDirectionDefault": not bool(_data[1] & 0b10000000),
|
|
|
|
+ "touchToOpen": bool(_data[1] & 0b01000000),
|
|
|
|
+ "light": bool(_data[1] & 0b00100000),
|
|
|
|
+ "openDirection": (
|
|
|
|
+ "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
|
|
|
|
+ ),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return self.ext_info_sum
|
|
|
|
+
|
|
|
|
+ def get_extended_info_adv(self) -> dict[str, Any] | None:
|
|
|
|
+ """Get advance page info for device chain."""
|
|
|
|
+ _data = self._sendcommand(key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count)
|
|
|
|
+
|
|
|
|
+ if _data in (b"\x07", b"\x00"):
|
|
|
|
+ _LOGGER.error("Unsuccessfull, please try again")
|
|
|
|
+ return None
|
|
|
|
|
|
- return self._settings
|
|
+ _state_of_charge = [
|
|
|
|
+ "not_charging",
|
|
|
|
+ "charging_by_adapter",
|
|
|
|
+ "charging_by_solar",
|
|
|
|
+ "fully_charged",
|
|
|
|
+ "solar_not_charging",
|
|
|
|
+ "charging_error",
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ self.ext_info_adv["device0"] = {
|
|
|
|
+ "battery": _data[1],
|
|
|
|
+ "firmware": _data[2] / 10.0,
|
|
|
|
+ "stateOfCharge": _state_of_charge[_data[3]],
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ if _data[4]:
|
|
|
|
+ self.ext_info_adv["device1"] = {
|
|
|
|
+ "battery": _data[4],
|
|
|
|
+ "firmware": _data[5] / 10.0,
|
|
|
|
+ "stateOfCharge": _state_of_charge[_data[6]],
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return self.ext_info_adv
|
|
|
|
|
|
def get_light_level(self) -> Any:
|
|
def get_light_level(self) -> Any:
|
|
"""Return cached light level."""
|
|
"""Return cached light level."""
|
|
|
|
|
|
- if not self._switchbot_device_data:
|
|
+ if not self._sb_adv_data.get("data"):
|
|
return None
|
|
return None
|
|
- return self._switchbot_device_data["data"]["lightLevel"]
|
|
+ return self._sb_adv_data["data"].get("lightLevel")
|
|
|
|
|
|
def is_reversed(self) -> bool:
|
|
def is_reversed(self) -> bool:
|
|
- """Return True if the curtain open from left to right."""
|
|
+ """Return True if curtain position is opposite from SB data."""
|
|
return self._reverse
|
|
return self._reverse
|
|
|
|
|
|
def is_calibrated(self) -> Any:
|
|
def is_calibrated(self) -> Any:
|
|
"""Return True curtain is calibrated."""
|
|
"""Return True curtain is calibrated."""
|
|
|
|
|
|
- if not self._switchbot_device_data:
|
|
+ if not self._sb_adv_data.get("data"):
|
|
return None
|
|
return None
|
|
- return self._switchbot_device_data["data"]["calibration"]
|
|
+ return self._sb_adv_data["data"].get("calibration")
|