|
@@ -1,13 +1,13 @@
|
|
|
"""Library to handle connection with Switchbot."""
|
|
|
from __future__ import annotations
|
|
|
|
|
|
+import asyncio
|
|
|
import binascii
|
|
|
import logging
|
|
|
-from threading import Lock
|
|
|
-import time
|
|
|
from typing import Any
|
|
|
+from uuid import UUID
|
|
|
|
|
|
-import bluepy
|
|
|
+import bleak
|
|
|
|
|
|
DEFAULT_RETRY_COUNT = 3
|
|
|
DEFAULT_RETRY_TIMEOUT = 1
|
|
@@ -38,16 +38,16 @@ CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
|
|
|
KEY_PASSWORD_PREFIX = "571"
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
-CONNECT_LOCK = Lock()
|
|
|
+CONNECT_LOCK = asyncio.Lock()
|
|
|
|
|
|
|
|
|
-def _sb_uuid(comms_type: str = "service") -> bluepy.btle.UUID:
|
|
|
+def _sb_uuid(comms_type: str = "service") -> UUID | str:
|
|
|
"""Return Switchbot UUID."""
|
|
|
|
|
|
_uuid = {"tx": "002", "rx": "003", "service": "d00"}
|
|
|
|
|
|
if comms_type in _uuid:
|
|
|
- return bluepy.btle.UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
|
|
|
+ return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
|
|
|
|
|
|
return "Incorrect type, choose between: tx, rx or service"
|
|
|
|
|
@@ -100,55 +100,68 @@ def _process_wosensorth(data: bytes) -> dict[str, object]:
|
|
|
return _wosensorth_data
|
|
|
|
|
|
|
|
|
-def _process_btle_adv_data(dev: bluepy.btle.ScanEntry) -> dict[str, Any]:
|
|
|
- """Process bt le adv data."""
|
|
|
- _adv_data = {"mac_address": dev.addr}
|
|
|
- _data = dev.getValue(22)[2:]
|
|
|
-
|
|
|
- supported_types: dict[str, dict[str, Any]] = {
|
|
|
- "H": {"modelName": "WoHand", "func": _process_wohand},
|
|
|
- "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
|
|
|
- "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
|
|
|
- }
|
|
|
-
|
|
|
- _model = chr(_data[0] & 0b01111111)
|
|
|
- _adv_data["isEncrypted"] = bool(_data[0] & 0b10000000)
|
|
|
- _adv_data["model"] = _model
|
|
|
- if _model in supported_types:
|
|
|
- _adv_data["data"] = supported_types[_model]["func"](_data)
|
|
|
- _adv_data["data"]["rssi"] = dev.rssi
|
|
|
- _adv_data["modelName"] = supported_types[_model]["modelName"]
|
|
|
- else:
|
|
|
- _adv_data["rawAdvData"] = dev.getValueText(22)
|
|
|
-
|
|
|
- return _adv_data
|
|
|
-
|
|
|
-
|
|
|
class GetSwitchbotDevices:
|
|
|
"""Scan for all Switchbot devices and return by type."""
|
|
|
|
|
|
def __init__(self, interface: int = 0) -> None:
|
|
|
"""Get switchbot devices class constructor."""
|
|
|
- self._interface = interface
|
|
|
+ self._interface = f"hci{interface}"
|
|
|
self._adv_data: dict[str, Any] = {}
|
|
|
|
|
|
- def discover(
|
|
|
+ def detection_callback(
|
|
|
self,
|
|
|
- retry: int = DEFAULT_RETRY_COUNT,
|
|
|
- scan_timeout: int = DEFAULT_SCAN_TIMEOUT,
|
|
|
- passive: bool = False,
|
|
|
- mac: str | None = None,
|
|
|
- ) -> dict[str, Any]:
|
|
|
+ device: bleak.backends.device.BLEDevice,
|
|
|
+ advertisement_data: bleak.backends.scanner.AdvertisementData,
|
|
|
+ ) -> None:
|
|
|
+ """BTLE adv scan callback."""
|
|
|
+ _device = device.address.replace(":", "").lower()
|
|
|
+ _service_data = list(advertisement_data.service_data.values())[0]
|
|
|
+ _model = chr(_service_data[0] & 0b01111111)
|
|
|
+
|
|
|
+ supported_types: dict[str, dict[str, Any]] = {
|
|
|
+ "H": {"modelName": "WoHand", "func": _process_wohand},
|
|
|
+ "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
|
|
|
+ "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
|
|
|
+ }
|
|
|
+
|
|
|
+ self._adv_data[_device] = {
|
|
|
+ "mac_address": device.address.lower(),
|
|
|
+ "rawAdvData": list(advertisement_data.service_data.values())[0],
|
|
|
+ "data": {
|
|
|
+ "rssi": device.rssi,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ if _model in supported_types:
|
|
|
+
|
|
|
+ self._adv_data[_device].update(
|
|
|
+ {
|
|
|
+ "isEncrypted": bool(_service_data[0] & 0b10000000),
|
|
|
+ "model": _model,
|
|
|
+ "modelName": supported_types[_model]["modelName"],
|
|
|
+ "data": supported_types[_model]["func"](_service_data),
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ self._adv_data[_device]["data"]["rssi"] = device.rssi
|
|
|
+
|
|
|
+ async def discover(
|
|
|
+ self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
|
|
|
+ ) -> dict:
|
|
|
"""Find switchbot devices and their advertisement data."""
|
|
|
+
|
|
|
devices = None
|
|
|
|
|
|
- with CONNECT_LOCK:
|
|
|
- try:
|
|
|
- devices = bluepy.btle.Scanner(self._interface).scan(
|
|
|
- scan_timeout, passive
|
|
|
- )
|
|
|
- except bluepy.btle.BTLEManagementError:
|
|
|
- _LOGGER.error("Error scanning for switchbot devices", exc_info=True)
|
|
|
+ devices = bleak.BleakScanner(
|
|
|
+ filters={"UUIDs": [str(_sb_uuid())]},
|
|
|
+ adapter=self._interface,
|
|
|
+ )
|
|
|
+ devices.register_detection_callback(self.detection_callback)
|
|
|
+
|
|
|
+ async with CONNECT_LOCK:
|
|
|
+ await devices.start()
|
|
|
+ await asyncio.sleep(scan_timeout)
|
|
|
+ await devices.stop()
|
|
|
|
|
|
if devices is None:
|
|
|
if retry < 1:
|
|
@@ -161,29 +174,15 @@ class GetSwitchbotDevices:
|
|
|
"Error scanning for Switchbot devices. Retrying (remaining: %d)",
|
|
|
retry,
|
|
|
)
|
|
|
- time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
- return self.discover(
|
|
|
- retry=retry - 1,
|
|
|
- scan_timeout=scan_timeout,
|
|
|
- passive=passive,
|
|
|
- mac=mac,
|
|
|
- )
|
|
|
-
|
|
|
- for dev in devices:
|
|
|
- if dev.getValueText(7) == str(_sb_uuid()):
|
|
|
- dev_id = dev.addr.replace(":", "")
|
|
|
- if mac:
|
|
|
- if dev.addr.lower() == mac.lower():
|
|
|
- self._adv_data[dev_id] = _process_btle_adv_data(dev)
|
|
|
- else:
|
|
|
- self._adv_data[dev_id] = _process_btle_adv_data(dev)
|
|
|
+ await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
+ return await self.discover(retry - 1, scan_timeout)
|
|
|
|
|
|
return self._adv_data
|
|
|
|
|
|
- def get_curtains(self) -> dict:
|
|
|
+ async def get_curtains(self) -> dict:
|
|
|
"""Return all WoCurtain/Curtains devices with services data."""
|
|
|
if not self._adv_data:
|
|
|
- self.discover()
|
|
|
+ await self.discover()
|
|
|
|
|
|
_curtain_devices = {
|
|
|
device: data
|
|
@@ -193,10 +192,10 @@ class GetSwitchbotDevices:
|
|
|
|
|
|
return _curtain_devices
|
|
|
|
|
|
- def get_bots(self) -> dict:
|
|
|
+ async def get_bots(self) -> dict[str, Any] | None:
|
|
|
"""Return all WoHand/Bot devices with services data."""
|
|
|
if not self._adv_data:
|
|
|
- self.discover()
|
|
|
+ await self.discover()
|
|
|
|
|
|
_bot_devices = {
|
|
|
device: data
|
|
@@ -206,10 +205,10 @@ class GetSwitchbotDevices:
|
|
|
|
|
|
return _bot_devices
|
|
|
|
|
|
- def get_tempsensors(self) -> dict:
|
|
|
+ async def get_tempsensors(self) -> dict[str, Any] | None:
|
|
|
"""Return all WoSensorTH/Temp sensor devices with services data."""
|
|
|
if not self._adv_data:
|
|
|
- self.discover()
|
|
|
+ await self.discover()
|
|
|
|
|
|
_bot_temp = {
|
|
|
device: data
|
|
@@ -219,10 +218,10 @@ class GetSwitchbotDevices:
|
|
|
|
|
|
return _bot_temp
|
|
|
|
|
|
- def get_device_data(self, mac: str) -> dict:
|
|
|
+ async def get_device_data(self, mac: str) -> dict[str, Any] | None:
|
|
|
"""Return data for specific device."""
|
|
|
if not self._adv_data:
|
|
|
- self.discover()
|
|
|
+ await self.discover()
|
|
|
|
|
|
_switchbot_data = {
|
|
|
device: data
|
|
@@ -233,7 +232,7 @@ class GetSwitchbotDevices:
|
|
|
return _switchbot_data
|
|
|
|
|
|
|
|
|
-class SwitchbotDevice(bluepy.btle.Peripheral):
|
|
|
+class SwitchbotDevice:
|
|
|
"""Base Representation of a Switchbot Device."""
|
|
|
|
|
|
def __init__(
|
|
@@ -244,13 +243,7 @@ class SwitchbotDevice(bluepy.btle.Peripheral):
|
|
|
**kwargs: Any,
|
|
|
) -> None:
|
|
|
"""Switchbot base class constructor."""
|
|
|
- bluepy.btle.Peripheral.__init__(
|
|
|
- self,
|
|
|
- deviceAddr=None,
|
|
|
- addrType=bluepy.btle.ADDR_TYPE_RANDOM,
|
|
|
- iface=interface,
|
|
|
- )
|
|
|
- self._interface = interface
|
|
|
+ self._interface = f"hci{interface}"
|
|
|
self._mac = mac.replace("-", ":").lower()
|
|
|
self._sb_adv_data: dict[str, Any] = {}
|
|
|
self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
|
|
@@ -261,179 +254,80 @@ class SwitchbotDevice(bluepy.btle.Peripheral):
|
|
|
self._password_encoded = "%x" % (
|
|
|
binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
|
|
|
)
|
|
|
+ self._last_notification = bytearray()
|
|
|
|
|
|
-
|
|
|
- def _connect(self, retry: int, timeout: int | None = None) -> None:
|
|
|
- _LOGGER.debug("Connecting to Switchbot")
|
|
|
-
|
|
|
- if retry < 1:
|
|
|
- self._stopHelper()
|
|
|
- raise bluepy.btle.BTLEDisconnectError(
|
|
|
- "Failed to connect to peripheral %s" % self._mac
|
|
|
- )
|
|
|
-
|
|
|
- if self._helper is None:
|
|
|
- self._startHelper(self._interface)
|
|
|
-
|
|
|
- self._writeCmd("conn %s %s\n" % (self._mac, bluepy.btle.ADDR_TYPE_RANDOM))
|
|
|
-
|
|
|
- rsp = self._getResp(["stat", "err"], timeout)
|
|
|
-
|
|
|
- while rsp and rsp["state"][0] in [
|
|
|
- "tryconn",
|
|
|
- "scan",
|
|
|
- ]:
|
|
|
- rsp = self._getResp(["stat", "err"], timeout)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if rsp and rsp["state"][0] == "disc":
|
|
|
- _LOGGER.warning("Bluepy busy, waiting before retry")
|
|
|
- self._stopHelper()
|
|
|
- time.sleep(self._scan_timeout)
|
|
|
- return self._connect(retry - 1, timeout)
|
|
|
-
|
|
|
- if rsp and rsp["rsp"][0] == "err":
|
|
|
- errcode = rsp["code"][0]
|
|
|
- _LOGGER.debug(
|
|
|
- "Error trying to connect to peripheral %s, error: %s",
|
|
|
- self._mac,
|
|
|
- errcode,
|
|
|
- )
|
|
|
-
|
|
|
- if errcode == "connfail":
|
|
|
- return self._connect(retry - 1, timeout)
|
|
|
-
|
|
|
- if errcode == "nomgmt":
|
|
|
- raise bluepy.btle.BTLEManagementError(
|
|
|
- "Management not available (permissions problem?)", rsp
|
|
|
- )
|
|
|
- if errcode == "atterr":
|
|
|
- raise bluepy.btle.BTLEGattError("Bluetooth command failed", rsp)
|
|
|
-
|
|
|
- raise bluepy.btle.BTLEException(
|
|
|
- "Error from bluepy-helper (%s)" % errcode, rsp
|
|
|
- )
|
|
|
-
|
|
|
- if rsp is None or rsp["state"][0] != "conn":
|
|
|
- self._stopHelper()
|
|
|
-
|
|
|
- if rsp is None:
|
|
|
- _LOGGER.warning(
|
|
|
- "Timed out while trying to connect to peripheral %s", self._mac
|
|
|
- )
|
|
|
-
|
|
|
- _LOGGER.warning("Bluehelper returned unable to connect state: %s", rsp)
|
|
|
-
|
|
|
- raise bluepy.btle.BTLEDisconnectError(
|
|
|
- "Failed to connect to peripheral %s, rsp: %s" % (self._mac, rsp)
|
|
|
- )
|
|
|
+ async def _notification_handler(self, sender: int, data: bytearray) -> None:
|
|
|
+ """Handle notification responses."""
|
|
|
+ self._last_notification = data
|
|
|
|
|
|
def _commandkey(self, key: str) -> str:
|
|
|
+ """Add password to key if set."""
|
|
|
if self._password_encoded is None:
|
|
|
return key
|
|
|
key_action = key[3]
|
|
|
key_suffix = key[4:]
|
|
|
return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
|
|
|
|
|
|
- def _writekey(self, key: str) -> bool:
|
|
|
- _LOGGER.debug("Prepare to send")
|
|
|
- if self._helper is None or self.getState() == "disc":
|
|
|
- return False
|
|
|
- try:
|
|
|
- hand = self.getCharacteristics(uuid=_sb_uuid("tx"))[0]
|
|
|
- _LOGGER.debug("Sending command, %s", key)
|
|
|
- hand.write(bytes.fromhex(key), withResponse=False)
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
- _LOGGER.warning("Error sending command to Switchbot", exc_info=True)
|
|
|
- raise
|
|
|
- else:
|
|
|
- _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac)
|
|
|
-
|
|
|
- return True
|
|
|
-
|
|
|
- def _subscribe(self) -> bool:
|
|
|
- _LOGGER.debug("Subscribe to notifications")
|
|
|
- if self._helper is None or self.getState() == "disc":
|
|
|
- return False
|
|
|
- enable_notify_flag = b"\x01\x00"
|
|
|
- try:
|
|
|
- handle = self.getCharacteristics(uuid=_sb_uuid("rx"))[0]
|
|
|
- notify_handle = handle.getHandle() + 1
|
|
|
- self.writeCharacteristic(
|
|
|
- notify_handle, enable_notify_flag, withResponse=False
|
|
|
- )
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
- _LOGGER.warning(
|
|
|
- "Error while enabling notifications on Switchbot", exc_info=True
|
|
|
- )
|
|
|
- raise
|
|
|
+ async def _sendcommand(self, key: str, retry: int) -> bytes:
|
|
|
+ """Send command to device and read response."""
|
|
|
+ command = bytearray.fromhex(self._commandkey(key))
|
|
|
+ notify_msg = b""
|
|
|
+ _LOGGER.debug("Sending command to switchbot %s", command)
|
|
|
|
|
|
- return True
|
|
|
+ if len(self._mac.split(":")) != 6:
|
|
|
+ raise ValueError("Expected MAC address, got %s" % repr(self._mac))
|
|
|
|
|
|
- def _readkey(self) -> bytes:
|
|
|
- _LOGGER.debug("Prepare to read notification from switchbot")
|
|
|
- if self._helper is None:
|
|
|
- return b"\x00"
|
|
|
- try:
|
|
|
- receive_handle = self.getCharacteristics(uuid=_sb_uuid("rx"))
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
- _LOGGER.warning(
|
|
|
- "Error while reading notifications from Switchbot", exc_info=True
|
|
|
- )
|
|
|
- else:
|
|
|
- for char in receive_handle:
|
|
|
- read_result: bytes = char.read()
|
|
|
- return read_result
|
|
|
+ async with CONNECT_LOCK:
|
|
|
+ try:
|
|
|
+ async with bleak.BleakClient(
|
|
|
+ address_or_ble_device=self._mac, timeout=float(self._scan_timeout)
|
|
|
+ ) as client:
|
|
|
+ _LOGGER.debug("Connnected to switchbot: %s", client.is_connected)
|
|
|
+
|
|
|
+ _LOGGER.debug("Subscribe to notifications")
|
|
|
+ await client.start_notify(
|
|
|
+ _sb_uuid(comms_type="rx"), self._notification_handler
|
|
|
+ )
|
|
|
|
|
|
-
|
|
|
- if self._helper and self.getState() == "disc":
|
|
|
- return b"\x01"
|
|
|
+ _LOGGER.debug("Sending command, %s", key)
|
|
|
+ await client.write_gatt_char(
|
|
|
+ _sb_uuid(comms_type="tx"), command, False
|
|
|
+ )
|
|
|
|
|
|
- return b"\x00"
|
|
|
+ await asyncio.sleep(
|
|
|
+ 1.0
|
|
|
+ )
|
|
|
|
|
|
- def _sendcommand(self, key: str, retry: int, timeout: int | None = 40) -> bytes:
|
|
|
- command = self._commandkey(key)
|
|
|
- send_success = False
|
|
|
- notify_msg = None
|
|
|
- _LOGGER.debug("Sending command to switchbot %s", command)
|
|
|
+ notify_msg = self._last_notification
|
|
|
+ _LOGGER.info("Notification received: %s", notify_msg)
|
|
|
|
|
|
- if len(self._mac.split(":")) != 6:
|
|
|
- raise ValueError("Expected MAC address, got %s" % repr(self._mac))
|
|
|
+ _LOGGER.debug("UnSubscribe to notifications")
|
|
|
+ await client.stop_notify(_sb_uuid(comms_type="rx"))
|
|
|
|
|
|
- with CONNECT_LOCK:
|
|
|
- try:
|
|
|
- self._connect(retry, timeout)
|
|
|
- send_success = self._subscribe()
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
- _LOGGER.warning("Error connecting to Switchbot", exc_info=True)
|
|
|
- else:
|
|
|
- try:
|
|
|
- send_success = self._writekey(command)
|
|
|
- except bluepy.btle.BTLEException:
|
|
|
- _LOGGER.warning(
|
|
|
- "Error sending commands to Switchbot", exc_info=True
|
|
|
+ except (bleak.BleakError, asyncio.exceptions.TimeoutError):
|
|
|
+
|
|
|
+ if retry < 1:
|
|
|
+ _LOGGER.error(
|
|
|
+ "Switchbot communication failed. Stopping trying", exc_info=True
|
|
|
)
|
|
|
- else:
|
|
|
- notify_msg = self._readkey()
|
|
|
- finally:
|
|
|
- self.disconnect()
|
|
|
+ return b"\x00"
|
|
|
|
|
|
- if notify_msg and send_success:
|
|
|
+ _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
|
|
|
+
|
|
|
+ if notify_msg:
|
|
|
if notify_msg == b"\x07":
|
|
|
_LOGGER.error("Password required")
|
|
|
elif notify_msg == b"\t":
|
|
|
_LOGGER.error("Password incorrect")
|
|
|
return notify_msg
|
|
|
|
|
|
- if retry < 1:
|
|
|
- _LOGGER.error(
|
|
|
- "Switchbot communication failed. Stopping trying", exc_info=True
|
|
|
- )
|
|
|
- return b"\x00"
|
|
|
_LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
|
|
|
- time.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
- return self._sendcommand(key, retry - 1)
|
|
|
+
|
|
|
+ if retry < 1:
|
|
|
+ return b"\x00"
|
|
|
+
|
|
|
+ await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
+ return await self._sendcommand(key, retry - 1)
|
|
|
|
|
|
def get_mac(self) -> str:
|
|
|
"""Return mac address of device."""
|
|
@@ -441,26 +335,23 @@ class SwitchbotDevice(bluepy.btle.Peripheral):
|
|
|
|
|
|
def get_battery_percent(self) -> Any:
|
|
|
"""Return device battery level in percent."""
|
|
|
- if not self._sb_adv_data.get("data"):
|
|
|
+ if not self._sb_adv_data:
|
|
|
return None
|
|
|
- return self._sb_adv_data["data"].get("battery")
|
|
|
+ return self._sb_adv_data["data"]["battery"]
|
|
|
|
|
|
- def get_device_data(
|
|
|
- self,
|
|
|
- retry: int = DEFAULT_RETRY_COUNT,
|
|
|
- interface: int | None = None,
|
|
|
- passive: bool = False,
|
|
|
- ) -> dict[str, Any]:
|
|
|
+ async def get_device_data(
|
|
|
+ self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
|
|
|
+ ) -> dict | None:
|
|
|
"""Find switchbot devices and their advertisement data."""
|
|
|
- _interface: int = interface if interface else self._interface
|
|
|
+ if interface:
|
|
|
+ _interface: int = interface
|
|
|
+ else:
|
|
|
+ _interface = int(self._interface.replace("hci", ""))
|
|
|
|
|
|
dev_id = self._mac.replace(":", "")
|
|
|
|
|
|
- _data = GetSwitchbotDevices(interface=_interface).discover(
|
|
|
- retry=retry,
|
|
|
- scan_timeout=self._scan_timeout,
|
|
|
- passive=passive,
|
|
|
- mac=self._mac,
|
|
|
+ _data = await GetSwitchbotDevices(interface=_interface).discover(
|
|
|
+ retry=retry, scan_timeout=self._scan_timeout
|
|
|
)
|
|
|
|
|
|
if _data.get(dev_id):
|
|
@@ -478,15 +369,13 @@ class Switchbot(SwitchbotDevice):
|
|
|
self._inverse: bool = kwargs.pop("inverse_mode", False)
|
|
|
self._settings: dict[str, Any] = {}
|
|
|
|
|
|
- def update(self, interface: int | None = None, passive: bool = False) -> None:
|
|
|
+ async def update(self, interface: int | None = None) -> None:
|
|
|
"""Update mode, battery percent and state of device."""
|
|
|
- self.get_device_data(
|
|
|
- retry=self._retry_count, interface=interface, passive=passive
|
|
|
- )
|
|
|
+ await self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
|
|
|
- def turn_on(self) -> bool:
|
|
|
+ async def turn_on(self) -> bool:
|
|
|
"""Turn device on."""
|
|
|
- result = self._sendcommand(ON_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(ON_KEY, self._retry_count)
|
|
|
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
@@ -497,9 +386,9 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def turn_off(self) -> bool:
|
|
|
+ async def turn_off(self) -> bool:
|
|
|
"""Turn device off."""
|
|
|
- result = self._sendcommand(OFF_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(OFF_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
@@ -509,9 +398,9 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def hand_up(self) -> bool:
|
|
|
+ async def hand_up(self) -> bool:
|
|
|
"""Raise device arm."""
|
|
|
- result = self._sendcommand(UP_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(UP_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
@@ -521,9 +410,9 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def hand_down(self) -> bool:
|
|
|
+ async def hand_down(self) -> bool:
|
|
|
"""Lower device arm."""
|
|
|
- result = self._sendcommand(DOWN_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(DOWN_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
@@ -533,9 +422,9 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def press(self) -> bool:
|
|
|
+ async def press(self) -> bool:
|
|
|
"""Press command to device."""
|
|
|
- result = self._sendcommand(PRESS_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(PRESS_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
@@ -545,14 +434,14 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def set_switch_mode(
|
|
|
+ async def set_switch_mode(
|
|
|
self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
|
|
|
) -> bool:
|
|
|
"""Change bot mode."""
|
|
|
mode_key = format(switch_mode, "b") + format(inverse, "b")
|
|
|
strength_key = f"{strength:0{2}x}"
|
|
|
|
|
|
- result = self._sendcommand(
|
|
|
+ result = await self._sendcommand(
|
|
|
DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
|
|
|
)
|
|
|
|
|
@@ -561,11 +450,11 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def set_long_press(self, duration: int = 0) -> bool:
|
|
|
+ async def set_long_press(self, duration: int = 0) -> bool:
|
|
|
"""Set bot long press duration."""
|
|
|
duration_key = f"{duration:0{2}x}"
|
|
|
|
|
|
- result = self._sendcommand(
|
|
|
+ result = await self._sendcommand(
|
|
|
DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
|
|
|
)
|
|
|
|
|
@@ -574,9 +463,9 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
+ async def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
"""Get device basic settings."""
|
|
|
- _data = self._sendcommand(
|
|
|
+ _data = await self._sendcommand(
|
|
|
key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
|
|
|
)
|
|
|
|
|
@@ -601,7 +490,7 @@ class Switchbot(SwitchbotDevice):
|
|
|
|
|
|
if not self._sb_adv_data.get("data"):
|
|
|
return None
|
|
|
- return self._sb_adv_data.get("switchMode")
|
|
|
+ return self._sb_adv_data["data"].get("switchMode")
|
|
|
|
|
|
def is_on(self) -> Any:
|
|
|
"""Return switch state from cache."""
|
|
@@ -635,45 +524,43 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
self.ext_info_sum: dict[str, Any] = {}
|
|
|
self.ext_info_adv: dict[str, Any] = {}
|
|
|
|
|
|
- def open(self) -> bool:
|
|
|
+ async def open(self) -> bool:
|
|
|
"""Send open command."""
|
|
|
- result = self._sendcommand(OPEN_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(OPEN_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def close(self) -> bool:
|
|
|
+ async def close(self) -> bool:
|
|
|
"""Send close command."""
|
|
|
- result = self._sendcommand(CLOSE_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(CLOSE_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def stop(self) -> bool:
|
|
|
+ async def stop(self) -> bool:
|
|
|
"""Send stop command to device."""
|
|
|
- result = self._sendcommand(STOP_KEY, self._retry_count)
|
|
|
+ result = await self._sendcommand(STOP_KEY, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def set_position(self, position: int) -> bool:
|
|
|
+ async def set_position(self, position: int) -> bool:
|
|
|
"""Send position command (0-100) to device."""
|
|
|
position = (100 - position) if self._reverse else position
|
|
|
hex_position = "%0.2X" % position
|
|
|
- result = self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
|
|
|
+ result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
|
|
|
if result[0] == 1:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def update(self, interface: int | None = None, passive: bool = False) -> None:
|
|
|
+ async def update(self, interface: int | None = None) -> None:
|
|
|
"""Update position, battery percent and light level of device."""
|
|
|
- self.get_device_data(
|
|
|
- retry=self._retry_count, interface=interface, passive=passive
|
|
|
- )
|
|
|
+ await self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
|
|
|
def get_position(self) -> Any:
|
|
|
"""Return cached position (0-100) of Curtain."""
|
|
@@ -682,9 +569,9 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
return None
|
|
|
return self._sb_adv_data["data"].get("position")
|
|
|
|
|
|
- def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
+ async def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
"""Get device basic settings."""
|
|
|
- _data = self._sendcommand(
|
|
|
+ _data = await self._sendcommand(
|
|
|
key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
|
|
|
)
|
|
|
|
|
@@ -713,9 +600,11 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
|
|
|
return self._settings
|
|
|
|
|
|
- def get_extended_info_summary(self) -> dict[str, Any] | None:
|
|
|
+ async def get_extended_info_summary(self) -> dict[str, Any] | None:
|
|
|
"""Get basic info for all devices in chain."""
|
|
|
- _data = self._sendcommand(key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count)
|
|
|
+ _data = await self._sendcommand(
|
|
|
+ key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
|
|
|
+ )
|
|
|
|
|
|
if _data in (b"\x07", b"\x00"):
|
|
|
_LOGGER.error("Unsuccessfull, please try again")
|
|
@@ -743,9 +632,12 @@ class SwitchbotCurtain(SwitchbotDevice):
|
|
|
|
|
|
return self.ext_info_sum
|
|
|
|
|
|
- def get_extended_info_adv(self) -> dict[str, Any] | None:
|
|
|
+ async def get_extended_info_adv(self) -> dict[str, Any] | None:
|
|
|
"""Get advance page info for device chain."""
|
|
|
- _data = self._sendcommand(key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count)
|
|
|
+
|
|
|
+ _data = await self._sendcommand(
|
|
|
+ key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
|
|
|
+ )
|
|
|
|
|
|
if _data in (b"\x07", b"\x00"):
|
|
|
_LOGGER.error("Unsuccessfull, please try again")
|