|
@@ -5,6 +5,7 @@ import binascii
|
|
|
import logging
|
|
|
import threading
|
|
|
import time
|
|
|
+from typing import Any
|
|
|
|
|
|
import bluepy
|
|
|
|
|
@@ -14,8 +15,10 @@ DEFAULT_SCAN_TIMEOUT = 5
|
|
|
|
|
|
UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
|
|
|
HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
|
|
|
+NOTIFICATION_HANDLE = "cba20003-224d-11e6-9fb8-0002a5d5c51b"
|
|
|
|
|
|
KEY_PASSWORD_PREFIX = "5711"
|
|
|
+KEY_PASSWORD_NOTIFY_PREFIX = "5712"
|
|
|
|
|
|
PRESS_KEY = "570100"
|
|
|
ON_KEY = "570101"
|
|
@@ -25,6 +28,7 @@ OPEN_KEY = "570f450105ff00" # 570F4501010100
|
|
|
CLOSE_KEY = "570f450105ff64"
|
|
|
POSITION_KEY = "570F450105ff"
|
|
|
STOP_KEY = "570F450100ff"
|
|
|
+DEVICE_BASIC_SETTINGS_KEY = "5702"
|
|
|
|
|
|
ON_KEY_SUFFIX = "01"
|
|
|
OFF_KEY_SUFFIX = "02"
|
|
@@ -34,52 +38,48 @@ _LOGGER = logging.getLogger(__name__)
|
|
|
CONNECT_LOCK = threading.Lock()
|
|
|
|
|
|
|
|
|
-def _process_wohand(data) -> dict:
|
|
|
+def _process_wohand(data: bytes) -> dict[str, bool | int]:
|
|
|
"""Process woHand/Bot services data."""
|
|
|
- _bot_data = {}
|
|
|
-
|
|
|
- _sensor_data = binascii.unhexlify(data.encode())
|
|
|
+ _bot_data: dict[str, bool | int] = {}
|
|
|
|
|
|
|
|
|
- _bot_data["switchMode"] = bool(_sensor_data[1] & 0b10000000)
|
|
|
+ _bot_data["switchMode"] = bool(data[1] & 0b10000000)
|
|
|
|
|
|
|
|
|
if _bot_data["switchMode"]:
|
|
|
- _bot_data["isOn"] = not bool(_sensor_data[1] & 0b01000000)
|
|
|
+ _bot_data["isOn"] = not bool(data[1] & 0b01000000)
|
|
|
|
|
|
else:
|
|
|
_bot_data["isOn"] = False
|
|
|
|
|
|
- _bot_data["battery"] = _sensor_data[2] & 0b01111111
|
|
|
+ _bot_data["battery"] = data[2] & 0b01111111
|
|
|
|
|
|
return _bot_data
|
|
|
|
|
|
|
|
|
-def _process_wocurtain(data, reverse=True) -> dict:
|
|
|
+def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
|
|
|
"""Process woCurtain/Curtain services data."""
|
|
|
- _curtain_data = {}
|
|
|
-
|
|
|
- _sensor_data = binascii.unhexlify(data.encode())
|
|
|
+ _curtain_data: dict[str, bool | int] = {}
|
|
|
|
|
|
- _curtain_data["calibration"] = bool(_sensor_data[1] & 0b01000000)
|
|
|
- _curtain_data["battery"] = _sensor_data[2] & 0b01111111
|
|
|
- _position = max(min(_sensor_data[3] & 0b01111111, 100), 0)
|
|
|
+ _curtain_data["calibration"] = bool(data[1] & 0b01000000)
|
|
|
+ _curtain_data["battery"] = data[2] & 0b01111111
|
|
|
+ _curtain_data["inMotion"] = bool(data[3] & 0b10000000)
|
|
|
+ _position = max(min(data[3] & 0b01111111, 100), 0)
|
|
|
_curtain_data["position"] = (100 - _position) if reverse else _position
|
|
|
|
|
|
|
|
|
- _curtain_data["lightLevel"] = (_sensor_data[4] >> 4) & 0b00001111
|
|
|
+ _curtain_data["lightLevel"] = (data[4] >> 4) & 0b00001111
|
|
|
+ _curtain_data["deviceChain"] = data[4] & 0b00000111
|
|
|
|
|
|
return _curtain_data
|
|
|
|
|
|
|
|
|
-def _process_wosensorth(data) -> dict:
|
|
|
+def _process_wosensorth(data: bytes) -> dict[str, Any]:
|
|
|
"""Process woSensorTH/Temp sensor services data."""
|
|
|
- _wosensorth_data = {}
|
|
|
-
|
|
|
- _sensor_data = binascii.unhexlify(data.encode())
|
|
|
+ _wosensorth_data: dict[str, Any] = {}
|
|
|
|
|
|
- _temp_sign = 1 if _sensor_data[4] & 0b10000000 else -1
|
|
|
- _temp_c = _temp_sign * ((_sensor_data[4] & 0b01111111) + (_sensor_data[3] / 10))
|
|
|
+ _temp_sign = 1 if data[4] & 0b10000000 else -1
|
|
|
+ _temp_c = _temp_sign * ((data[4] & 0b01111111) + (data[3] / 10))
|
|
|
_temp_f = (_temp_c * 9 / 5) + 32
|
|
|
_temp_f = (_temp_f * 10) / 10
|
|
|
|
|
@@ -87,9 +87,9 @@ def _process_wosensorth(data) -> dict:
|
|
|
_wosensorth_data["temp"]["c"] = _temp_c
|
|
|
_wosensorth_data["temp"]["f"] = _temp_f
|
|
|
|
|
|
- _wosensorth_data["fahrenheit"] = bool(_sensor_data[5] & 0b10000000)
|
|
|
- _wosensorth_data["humidity"] = _sensor_data[5] & 0b01111111
|
|
|
- _wosensorth_data["battery"] = _sensor_data[2] & 0b01111111
|
|
|
+ _wosensorth_data["fahrenheit"] = bool(data[5] & 0b10000000)
|
|
|
+ _wosensorth_data["humidity"] = data[5] & 0b01111111
|
|
|
+ _wosensorth_data["battery"] = data[2] & 0b01111111
|
|
|
|
|
|
return _wosensorth_data
|
|
|
|
|
@@ -97,13 +97,13 @@ def _process_wosensorth(data) -> dict:
|
|
|
class GetSwitchbotDevices:
|
|
|
"""Scan for all Switchbot devices and return by type."""
|
|
|
|
|
|
- def __init__(self, interface=None) -> None:
|
|
|
+ def __init__(self, interface: int | None = None) -> None:
|
|
|
"""Get switchbot devices class constructor."""
|
|
|
self._interface = interface
|
|
|
- self._all_services_data = {}
|
|
|
+ self._all_services_data: dict[str, Any] = {}
|
|
|
|
|
|
def discover(
|
|
|
- self, retry=DEFAULT_RETRY_COUNT, scan_timeout=DEFAULT_SCAN_TIMEOUT
|
|
|
+ self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
|
|
|
) -> dict | None:
|
|
|
"""Find switchbot devices and their advertisement data."""
|
|
|
|
|
@@ -136,26 +136,36 @@ class GetSwitchbotDevices:
|
|
|
self._all_services_data[dev_id]["mac_address"] = dev.addr
|
|
|
for (adtype, desc, value) in dev.getScanData():
|
|
|
if adtype == 22:
|
|
|
- _model = chr(binascii.unhexlify(value.encode())[2] & 0b01111111)
|
|
|
+ _data = bytes.fromhex(value[4:])
|
|
|
+ _model = chr(_data[0] & 0b01111111)
|
|
|
if _model == "H":
|
|
|
self._all_services_data[dev_id]["data"] = _process_wohand(
|
|
|
- value[4:]
|
|
|
+ _data
|
|
|
)
|
|
|
self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
|
|
|
+ self._all_services_data[dev_id]["isEncrypted"] = bool(
|
|
|
+ _data[0] & 0b10000000
|
|
|
+ )
|
|
|
self._all_services_data[dev_id]["model"] = _model
|
|
|
self._all_services_data[dev_id]["modelName"] = "WoHand"
|
|
|
elif _model == "c":
|
|
|
self._all_services_data[dev_id][
|
|
|
"data"
|
|
|
- ] = _process_wocurtain(value[4:])
|
|
|
+ ] = _process_wocurtain(_data)
|
|
|
self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
|
|
|
+ self._all_services_data[dev_id]["isEncrypted"] = bool(
|
|
|
+ _data[0] & 0b10000000
|
|
|
+ )
|
|
|
self._all_services_data[dev_id]["model"] = _model
|
|
|
self._all_services_data[dev_id]["modelName"] = "WoCurtain"
|
|
|
elif _model == "T":
|
|
|
self._all_services_data[dev_id][
|
|
|
"data"
|
|
|
- ] = _process_wosensorth(value[4:])
|
|
|
+ ] = _process_wosensorth(_data)
|
|
|
self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi
|
|
|
+ self._all_services_data[dev_id]["isEncrypted"] = bool(
|
|
|
+ _data[0] & 0b10000000
|
|
|
+ )
|
|
|
self._all_services_data[dev_id]["model"] = _model
|
|
|
self._all_services_data[dev_id]["modelName"] = "WoSensorTH"
|
|
|
|
|
@@ -166,42 +176,55 @@ class GetSwitchbotDevices:
|
|
|
|
|
|
return self._all_services_data
|
|
|
|
|
|
- def get_curtains(self) -> dict | None:
|
|
|
+ def get_curtains(self) -> dict:
|
|
|
"""Return all WoCurtain/Curtains devices with services data."""
|
|
|
if not self._all_services_data:
|
|
|
self.discover()
|
|
|
|
|
|
_curtain_devices = {}
|
|
|
|
|
|
- for item in self._all_services_data:
|
|
|
- if self._all_services_data[item]["model"] == "c":
|
|
|
- _curtain_devices[item] = self._all_services_data[item]
|
|
|
+ for device, data in self._all_services_data.items():
|
|
|
+ if data.get("model") == "c":
|
|
|
+ _curtain_devices[device] = data
|
|
|
|
|
|
return _curtain_devices
|
|
|
|
|
|
- def get_bots(self) -> dict | None:
|
|
|
+ def get_bots(self) -> dict:
|
|
|
"""Return all WoHand/Bot devices with services data."""
|
|
|
if not self._all_services_data:
|
|
|
self.discover()
|
|
|
|
|
|
_bot_devices = {}
|
|
|
|
|
|
- for item in self._all_services_data:
|
|
|
- if self._all_services_data[item]["model"] == "H":
|
|
|
- _bot_devices[item] = self._all_services_data[item]
|
|
|
+ for device, data in self._all_services_data.items():
|
|
|
+ if data.get("model") == "H":
|
|
|
+ _bot_devices[device] = data
|
|
|
|
|
|
return _bot_devices
|
|
|
|
|
|
- def get_device_data(self, mac) -> dict | None:
|
|
|
+ def get_tempsensors(self) -> dict:
|
|
|
+ """Return all WoSensorTH/Temp sensor devices with services data."""
|
|
|
+ if not self._all_services_data:
|
|
|
+ self.discover()
|
|
|
+
|
|
|
+ _bot_temp = {}
|
|
|
+
|
|
|
+ for device, data in self._all_services_data.items():
|
|
|
+ if data.get("model") == "T":
|
|
|
+ _bot_temp[device] = data
|
|
|
+
|
|
|
+ return _bot_temp
|
|
|
+
|
|
|
+ def get_device_data(self, mac: str) -> dict:
|
|
|
"""Return data for specific device."""
|
|
|
if not self._all_services_data:
|
|
|
self.discover()
|
|
|
|
|
|
_switchbot_data = {}
|
|
|
|
|
|
- for item in self._all_services_data:
|
|
|
- if self._all_services_data[item]["mac_address"] == mac:
|
|
|
- _switchbot_data = self._all_services_data[item]
|
|
|
+ for device in self._all_services_data.values():
|
|
|
+ if device["mac_address"] == mac:
|
|
|
+ _switchbot_data = device
|
|
|
|
|
|
return _switchbot_data
|
|
|
|
|
@@ -209,14 +232,22 @@ class GetSwitchbotDevices:
|
|
|
class SwitchbotDevice:
|
|
|
"""Base Representation of a Switchbot Device."""
|
|
|
|
|
|
- def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ mac: str,
|
|
|
+ password: str | None = None,
|
|
|
+ interface: int | None = None,
|
|
|
+ **kwargs: Any,
|
|
|
+ ) -> None:
|
|
|
"""Switchbot base class constructor."""
|
|
|
self._interface = interface
|
|
|
self._mac = mac
|
|
|
- self._device = None
|
|
|
- self._switchbot_device_data = {}
|
|
|
- self._scan_timeout = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
|
|
|
- self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
|
|
|
+ self._device = bluepy.btle.Peripheral(
|
|
|
+ deviceAddr=None, addrType=bluepy.btle.ADDR_TYPE_RANDOM, iface=interface
|
|
|
+ )
|
|
|
+ self._switchbot_device_data: dict[str, Any] = {}
|
|
|
+ self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
|
|
|
+ self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
|
|
|
if password is None or password == "":
|
|
|
self._password_encoded = None
|
|
|
else:
|
|
@@ -225,31 +256,24 @@ class SwitchbotDevice:
|
|
|
)
|
|
|
|
|
|
def _connect(self) -> None:
|
|
|
- if self._device is not None:
|
|
|
- return
|
|
|
try:
|
|
|
_LOGGER.debug("Connecting to Switchbot")
|
|
|
- self._device = bluepy.btle.Peripheral(
|
|
|
+ self._device.connect(
|
|
|
self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface
|
|
|
)
|
|
|
_LOGGER.debug("Connected to Switchbot")
|
|
|
except bluepy.btle.BTLEException:
|
|
|
_LOGGER.debug("Failed connecting to Switchbot", exc_info=True)
|
|
|
- self._device = None
|
|
|
raise
|
|
|
|
|
|
def _disconnect(self) -> None:
|
|
|
- if self._device is None:
|
|
|
- return
|
|
|
_LOGGER.debug("Disconnecting")
|
|
|
try:
|
|
|
self._device.disconnect()
|
|
|
except bluepy.btle.BTLEException:
|
|
|
_LOGGER.warning("Error disconnecting from Switchbot", exc_info=True)
|
|
|
- finally:
|
|
|
- self._device = None
|
|
|
|
|
|
- def _commandkey(self, key) -> str:
|
|
|
+ def _commandkey(self, key: str) -> str:
|
|
|
if self._password_encoded is None:
|
|
|
return key
|
|
|
key_suffix = PRESS_KEY_SUFFIX
|
|
@@ -257,14 +281,15 @@ class SwitchbotDevice:
|
|
|
key_suffix = ON_KEY_SUFFIX
|
|
|
elif key == OFF_KEY:
|
|
|
key_suffix = OFF_KEY_SUFFIX
|
|
|
+ elif key == DEVICE_BASIC_SETTINGS_KEY:
|
|
|
+ return KEY_PASSWORD_NOTIFY_PREFIX + self._password_encoded
|
|
|
return KEY_PASSWORD_PREFIX + self._password_encoded + key_suffix
|
|
|
|
|
|
- def _writekey(self, key) -> bool:
|
|
|
+ def _writekey(self, key: str) -> Any:
|
|
|
_LOGGER.debug("Prepare to send")
|
|
|
- hand_service = self._device.getServiceByUUID(UUID)
|
|
|
- hand = hand_service.getCharacteristics(HANDLE)[0]
|
|
|
+ hand = self._device.getCharacteristics(uuid=HANDLE)[0]
|
|
|
_LOGGER.debug("Sending command, %s", key)
|
|
|
- write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
|
|
|
+ write_result = hand.write(binascii.a2b_hex(key), withResponse=False)
|
|
|
if not write_result:
|
|
|
_LOGGER.error(
|
|
|
"Sent command but didn't get a response from Switchbot confirming command was sent."
|
|
@@ -274,7 +299,25 @@ class SwitchbotDevice:
|
|
|
_LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
|
|
|
return write_result
|
|
|
|
|
|
- def _sendcommand(self, key, retry) -> bool:
|
|
|
+ def _subscribe(self, key: str) -> Any:
|
|
|
+ _LOGGER.debug("Subscribe to notifications")
|
|
|
+ handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)[0]
|
|
|
+ notify_handle = handle.getHandle() + 1
|
|
|
+ response = self._device.writeCharacteristic(
|
|
|
+ notify_handle, binascii.a2b_hex(key), withResponse=False
|
|
|
+ )
|
|
|
+ return response
|
|
|
+
|
|
|
+ def _readkey(self) -> bytes | None:
|
|
|
+ _LOGGER.debug("Prepare to read")
|
|
|
+ receive_handle = self._device.getCharacteristics(uuid=NOTIFICATION_HANDLE)
|
|
|
+ if receive_handle:
|
|
|
+ for char in receive_handle:
|
|
|
+ read_result: bytes = char.read()
|
|
|
+ return read_result
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _sendcommand(self, key: str, retry: int) -> bool:
|
|
|
send_success = False
|
|
|
command = self._commandkey(key)
|
|
|
_LOGGER.debug("Sending command to switchbot %s", command)
|
|
@@ -301,16 +344,18 @@ class SwitchbotDevice:
|
|
|
"""Return mac address of device."""
|
|
|
return self._mac
|
|
|
|
|
|
- def get_battery_percent(self) -> int:
|
|
|
+ def get_battery_percent(self) -> Any:
|
|
|
"""Return device battery level in percent."""
|
|
|
if not self._switchbot_device_data:
|
|
|
return None
|
|
|
return self._switchbot_device_data["data"]["battery"]
|
|
|
|
|
|
- def get_device_data(self, retry=DEFAULT_RETRY_COUNT, interface=None) -> dict | None:
|
|
|
+ def get_device_data(
|
|
|
+ self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
|
|
|
+ ) -> dict | None:
|
|
|
"""Find switchbot devices and their advertisement data."""
|
|
|
if interface:
|
|
|
- _interface = interface
|
|
|
+ _interface: int | None = interface
|
|
|
else:
|
|
|
_interface = self._interface
|
|
|
|
|
@@ -341,26 +386,34 @@ class SwitchbotDevice:
|
|
|
self._switchbot_device_data["mac_address"] = dev.addr
|
|
|
for (adtype, desc, value) in dev.getScanData():
|
|
|
if adtype == 22:
|
|
|
- _model = chr(binascii.unhexlify(value.encode())[2] & 0b01111111)
|
|
|
+ _data = bytes.fromhex(value[4:])
|
|
|
+ _model = chr(_data[0] & 0b01111111)
|
|
|
if _model == "H":
|
|
|
- self._switchbot_device_data["data"] = _process_wohand(
|
|
|
- value[4:]
|
|
|
- )
|
|
|
+ self._switchbot_device_data["data"] = _process_wohand(_data)
|
|
|
self._switchbot_device_data["data"]["rssi"] = dev.rssi
|
|
|
+ self._switchbot_device_data["isEncrypted"] = bool(
|
|
|
+ _data[0] & 0b10000000
|
|
|
+ )
|
|
|
self._switchbot_device_data["model"] = _model
|
|
|
self._switchbot_device_data["modelName"] = "WoHand"
|
|
|
elif _model == "c":
|
|
|
self._switchbot_device_data["data"] = _process_wocurtain(
|
|
|
- value[4:]
|
|
|
+ _data
|
|
|
)
|
|
|
self._switchbot_device_data["data"]["rssi"] = dev.rssi
|
|
|
+ self._switchbot_device_data["isEncrypted"] = bool(
|
|
|
+ _data[0] & 0b10000000
|
|
|
+ )
|
|
|
self._switchbot_device_data["model"] = _model
|
|
|
self._switchbot_device_data["modelName"] = "WoCurtain"
|
|
|
elif _model == "T":
|
|
|
self._switchbot_device_data["data"] = _process_wosensorth(
|
|
|
- value[4:]
|
|
|
+ _data
|
|
|
)
|
|
|
self._switchbot_device_data["data"]["rssi"] = dev.rssi
|
|
|
+ self._switchbot_device_data["isEncrypted"] = bool(
|
|
|
+ _data[0] & 0b10000000
|
|
|
+ )
|
|
|
self._switchbot_device_data["model"] = _model
|
|
|
self._switchbot_device_data["modelName"] = "WoSensorTH"
|
|
|
|
|
@@ -371,16 +424,46 @@ class SwitchbotDevice:
|
|
|
|
|
|
return self._switchbot_device_data
|
|
|
|
|
|
+ def _get_basic_info(self, retry: int = DEFAULT_RETRY_COUNT) -> bytes:
|
|
|
+ """Get device basic settings."""
|
|
|
+ send_success = False
|
|
|
+ command = self._commandkey(DEVICE_BASIC_SETTINGS_KEY)
|
|
|
+ try:
|
|
|
+ self._connect()
|
|
|
+ self._subscribe(command)
|
|
|
+ send_success = self._writekey(command)
|
|
|
+ value = self._readkey()
|
|
|
+ except bluepy.btle.BTLEException:
|
|
|
+ _LOGGER.warning("Error talking to Switchbot", exc_info=True)
|
|
|
+ finally:
|
|
|
+ self._disconnect()
|
|
|
+
|
|
|
+ if send_success and value:
|
|
|
+
|
|
|
+ print("Successfully retrieved data from device " + str(self._mac))
|
|
|
+
|
|
|
+ return value
|
|
|
+
|
|
|
+ if retry < 1:
|
|
|
+ _LOGGER.error(
|
|
|
+ "Switchbot communication failed. Stopping trying", exc_info=True
|
|
|
+ )
|
|
|
+ return bytes(0)
|
|
|
+ _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
|
|
|
+ time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
+ return self._get_basic_info(retry - 1)
|
|
|
+
|
|
|
|
|
|
class Switchbot(SwitchbotDevice):
|
|
|
"""Representation of a Switchbot."""
|
|
|
|
|
|
- def __init__(self, *args, **kwargs) -> None:
|
|
|
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
"""Switchbot Bot/WoHand constructor."""
|
|
|
super().__init__(*args, **kwargs)
|
|
|
- self._inverse = kwargs.pop("inverse_mode", False)
|
|
|
+ self._inverse: bool = kwargs.pop("inverse_mode", False)
|
|
|
+ self._settings: dict[str, Any] = {}
|
|
|
|
|
|
- def update(self, interface=None) -> None:
|
|
|
+ def update(self, interface: int | None = None) -> None:
|
|
|
"""Update mode, battery percent and state of device."""
|
|
|
self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
|
|
@@ -396,14 +479,27 @@ class Switchbot(SwitchbotDevice):
|
|
|
"""Press command to device."""
|
|
|
return self._sendcommand(PRESS_KEY, self._retry_count)
|
|
|
|
|
|
- def switch_mode(self) -> str:
|
|
|
+ def get_basic_info(self) -> dict[str, Any]:
|
|
|
+ """Get device basic settings."""
|
|
|
+ settings_data = self._get_basic_info()
|
|
|
+ self._settings["battery"] = settings_data[1]
|
|
|
+ self._settings["firmware"] = settings_data[2] / 10.0
|
|
|
+
|
|
|
+ self._settings["timers"] = settings_data[8]
|
|
|
+ self._settings["dualStateMode"] = bool(settings_data[9] & 16)
|
|
|
+ self._settings["inverseDirection"] = bool(settings_data[9] & 1)
|
|
|
+ self._settings["holdSeconds"] = settings_data[10]
|
|
|
+
|
|
|
+ return self._settings
|
|
|
+
|
|
|
+ def switch_mode(self) -> Any:
|
|
|
"""Return true or false from cache."""
|
|
|
|
|
|
if not self._switchbot_device_data:
|
|
|
return None
|
|
|
return self._switchbot_device_data["data"]["switchMode"]
|
|
|
|
|
|
- def is_on(self) -> bool:
|
|
|
+ def is_on(self) -> Any:
|
|
|
"""Return switch state from cache."""
|
|
|
|
|
|
if not self._switchbot_device_data:
|
|
@@ -418,7 +514,7 @@ class Switchbot(SwitchbotDevice):
|
|
|
class SwitchbotCurtain(SwitchbotDevice):
|
|
|
"""Representation of a Switchbot Curtain."""
|
|
|
|
|
|
- def __init__(self, *args, **kwargs) -> None:
|
|
|
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
"""Switchbot Curtain/WoCurtain constructor."""
|
|
|
|
|
|
|
|
@@ -430,7 +526,8 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
|
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
- self._reverse = kwargs.pop("reverse_mode", True)
|
|
|
+ self._reverse: bool = kwargs.pop("reverse_mode", True)
|
|
|
+ self._settings: dict[str, Any] = {}
|
|
|
|
|
|
def open(self) -> bool:
|
|
|
"""Send open command."""
|
|
@@ -450,18 +547,45 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
hex_position = "%0.2X" % position
|
|
|
return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
|
|
|
|
|
|
- def update(self, interface=None) -> None:
|
|
|
+ def update(self, interface: int | None = None) -> None:
|
|
|
"""Update position, battery percent and light level of device."""
|
|
|
self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
|
|
|
- def get_position(self) -> int:
|
|
|
+ def get_position(self) -> Any:
|
|
|
"""Return cached position (0-100) of Curtain."""
|
|
|
|
|
|
if not self._switchbot_device_data:
|
|
|
return None
|
|
|
return self._switchbot_device_data["data"]["position"]
|
|
|
|
|
|
- def get_light_level(self) -> int:
|
|
|
+ def get_basic_info(self) -> dict[str, Any]:
|
|
|
+ """Get device basic settings."""
|
|
|
+ settings_data = self._get_basic_info()
|
|
|
+ self._settings["battery"] = settings_data[1]
|
|
|
+ self._settings["firmware"] = settings_data[2] / 10.0
|
|
|
+
|
|
|
+ self._settings["chainLength"] = settings_data[3]
|
|
|
+
|
|
|
+ self._settings["openDirection"] = (
|
|
|
+ "right_to_left" if settings_data[4] & 0b10000000 == 128 else "left_to_right"
|
|
|
+ )
|
|
|
+
|
|
|
+ self._settings["touchToOpen"] = bool(settings_data[4] & 0b01000000)
|
|
|
+ self._settings["light"] = bool(settings_data[4] & 0b00100000)
|
|
|
+ self._settings["fault"] = bool(settings_data[4] & 0b00001000)
|
|
|
+
|
|
|
+ self._settings["solarPanel"] = bool(settings_data[5] & 0b00001000)
|
|
|
+ self._settings["calibrated"] = bool(settings_data[5] & 0b00000100)
|
|
|
+ self._settings["inMotion"] = bool(settings_data[5] & 0b01000011)
|
|
|
+
|
|
|
+ _position = max(min(settings_data[6], 100), 0)
|
|
|
+ self._settings["position"] = (100 - _position) if self._reverse else _position
|
|
|
+
|
|
|
+ self._settings["timers"] = settings_data[7]
|
|
|
+
|
|
|
+ return self._settings
|
|
|
+
|
|
|
+ def get_light_level(self) -> Any:
|
|
|
"""Return cached light level."""
|
|
|
|
|
|
if not self._switchbot_device_data:
|
|
@@ -472,7 +596,7 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
"""Return True if the curtain open from left to right."""
|
|
|
return self._reverse
|
|
|
|
|
|
- def is_calibrated(self) -> bool:
|
|
|
+ def is_calibrated(self) -> Any:
|
|
|
"""Return True curtain is calibrated."""
|
|
|
|
|
|
if not self._switchbot_device_data:
|