|
@@ -1,806 +1,20 @@
|
|
|
"""Library to handle connection with Switchbot."""
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import asyncio
|
|
|
-import binascii
|
|
|
-import logging
|
|
|
-from dataclasses import dataclass
|
|
|
-from typing import Any
|
|
|
-from uuid import UUID
|
|
|
-
|
|
|
-import async_timeout
|
|
|
-import bleak
|
|
|
-from bleak.backends.device import BLEDevice
|
|
|
-from bleak.backends.scanner import AdvertisementData
|
|
|
-from bleak_retry_connector import BleakClient, establish_connection
|
|
|
-
|
|
|
-DEFAULT_RETRY_COUNT = 3
|
|
|
-DEFAULT_RETRY_TIMEOUT = 1
|
|
|
-DEFAULT_SCAN_TIMEOUT = 5
|
|
|
-
|
|
|
-
|
|
|
-DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
|
|
|
-DEVICE_SET_MODE_KEY = "5703"
|
|
|
-DEVICE_SET_EXTENDED_KEY = "570f"
|
|
|
-
|
|
|
-
|
|
|
-PLUG_ON_KEY = "570f50010180"
|
|
|
-PLUG_OFF_KEY = "570f50010100"
|
|
|
-
|
|
|
-
|
|
|
-PRESS_KEY = "570100"
|
|
|
-ON_KEY = "570101"
|
|
|
-OFF_KEY = "570102"
|
|
|
-DOWN_KEY = "570103"
|
|
|
-UP_KEY = "570104"
|
|
|
-
|
|
|
-
|
|
|
-OPEN_KEY = "570f450105ff00"
|
|
|
-CLOSE_KEY = "570f450105ff64"
|
|
|
-POSITION_KEY = "570F450105ff"
|
|
|
-STOP_KEY = "570F450100ff"
|
|
|
-CURTAIN_EXT_SUM_KEY = "570f460401"
|
|
|
-CURTAIN_EXT_ADV_KEY = "570f460402"
|
|
|
-CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
|
|
|
-
|
|
|
-
|
|
|
-KEY_PASSWORD_PREFIX = "571"
|
|
|
-
|
|
|
-_LOGGER = logging.getLogger(__name__)
|
|
|
-CONNECT_LOCK = asyncio.Lock()
|
|
|
-
|
|
|
-
|
|
|
-def _sb_uuid(comms_type: str = "service") -> UUID | str:
|
|
|
- """Return Switchbot UUID."""
|
|
|
-
|
|
|
- _uuid = {"tx": "002", "rx": "003", "service": "d00"}
|
|
|
-
|
|
|
- if comms_type in _uuid:
|
|
|
- return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
|
|
|
-
|
|
|
- return "Incorrect type, choose between: tx, rx or service"
|
|
|
-
|
|
|
-
|
|
|
-def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
|
|
|
- """Process woHand/Bot services data."""
|
|
|
- _switch_mode = bool(data[1] & 0b10000000)
|
|
|
-
|
|
|
- _bot_data = {
|
|
|
- "switchMode": _switch_mode,
|
|
|
- "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
|
|
|
- "battery": data[2] & 0b01111111,
|
|
|
- }
|
|
|
-
|
|
|
- return _bot_data
|
|
|
-
|
|
|
-
|
|
|
-def _process_wocurtain(
|
|
|
- data: bytes, mfr_data: bytes | None, reverse: bool = True
|
|
|
-) -> dict[str, bool | int]:
|
|
|
- """Process woCurtain/Curtain services data."""
|
|
|
-
|
|
|
- _position = max(min(data[3] & 0b01111111, 100), 0)
|
|
|
-
|
|
|
- _curtain_data = {
|
|
|
- "calibration": bool(data[1] & 0b01000000),
|
|
|
- "battery": data[2] & 0b01111111,
|
|
|
- "inMotion": bool(data[3] & 0b10000000),
|
|
|
- "position": (100 - _position) if reverse else _position,
|
|
|
- "lightLevel": (data[4] >> 4) & 0b00001111,
|
|
|
- "deviceChain": data[4] & 0b00000111,
|
|
|
- }
|
|
|
-
|
|
|
- return _curtain_data
|
|
|
-
|
|
|
-
|
|
|
-def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
|
|
|
- """Process woSensorTH/Temp sensor services data."""
|
|
|
-
|
|
|
- _temp_sign = 1 if data[4] & 0b10000000 else -1
|
|
|
- _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
|
|
|
- _temp_f = (_temp_c * 9 / 5) + 32
|
|
|
- _temp_f = (_temp_f * 10) / 10
|
|
|
-
|
|
|
- _wosensorth_data = {
|
|
|
- "temp": {"c": _temp_c, "f": _temp_f},
|
|
|
- "fahrenheit": bool(data[5] & 0b10000000),
|
|
|
- "humidity": data[5] & 0b01111111,
|
|
|
- "battery": data[2] & 0b01111111,
|
|
|
- }
|
|
|
-
|
|
|
- return _wosensorth_data
|
|
|
-
|
|
|
-
|
|
|
-def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
|
|
|
- """Process woContact Sensor services data."""
|
|
|
- return {
|
|
|
- "tested": bool(data[1] & 0b10000000),
|
|
|
- "motion_detected": bool(data[1] & 0b01000000),
|
|
|
- "battery": data[2] & 0b01111111,
|
|
|
- "contact_open": data[3] & 0b00000010 == 0b00000010,
|
|
|
- "contact_timeout": data[3] & 0b00000110 == 0b00000110,
|
|
|
- "is_light": bool(data[3] & 0b00000001),
|
|
|
- "button_count": (data[7] & 0b11110000) >> 4,
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-def _process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
|
|
|
- """Process WoPresence Sensor services data."""
|
|
|
- return {
|
|
|
- "tested": bool(data[1] & 0b10000000),
|
|
|
- "motion_detected": bool(data[1] & 0b01000000),
|
|
|
- "battery": data[2] & 0b01111111,
|
|
|
- "led": (data[5] & 0b00100000) >> 5,
|
|
|
- "iot": (data[5] & 0b00010000) >> 4,
|
|
|
- "sense_distance": (data[5] & 0b00001100) >> 2,
|
|
|
- "light_intensity": data[5] & 0b00000011,
|
|
|
- "is_light": bool(data[5] & 0b00000010),
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
|
|
|
- """Process plug mini."""
|
|
|
- return {
|
|
|
- "switchMode": True,
|
|
|
- "isOn": mfr_data[7] == 0x80,
|
|
|
- "wifi_rssi": -mfr_data[9],
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-def _process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
|
|
|
- """Process WoBulb services data."""
|
|
|
- return {
|
|
|
- "sequence_number": mfr_data[6],
|
|
|
- "isOn": bool(mfr_data[7] & 0b10000000),
|
|
|
- "brightness": mfr_data[7] & 0b01111111,
|
|
|
- "delay": bool(mfr_data[8] & 0b10000000),
|
|
|
- "preset": bool(mfr_data[8] & 0b00001000),
|
|
|
- "light_state": mfr_data[8] & 0b00000111,
|
|
|
- "speed": mfr_data[9] & 0b01111111,
|
|
|
- "loop_index": mfr_data[10] & 0b11111110,
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class SwitchBotAdvertisement:
|
|
|
- """Switchbot advertisement."""
|
|
|
-
|
|
|
- address: str
|
|
|
- data: dict[str, Any]
|
|
|
- device: BLEDevice
|
|
|
-
|
|
|
-
|
|
|
-def parse_advertisement_data(
|
|
|
- device: BLEDevice, advertisement_data: AdvertisementData
|
|
|
-) -> SwitchBotAdvertisement | None:
|
|
|
- """Parse advertisement data."""
|
|
|
- _services = list(advertisement_data.service_data.values())
|
|
|
- _mgr_datas = list(advertisement_data.manufacturer_data.values())
|
|
|
-
|
|
|
- if not _services:
|
|
|
- return
|
|
|
- _service_data = _services[0]
|
|
|
- _mfr_data = _mgr_datas[0] if _mgr_datas else None
|
|
|
- _model = chr(_service_data[0] & 0b01111111)
|
|
|
-
|
|
|
- supported_types: dict[str, dict[str, Any]] = {
|
|
|
- "d": {"modelName": "WoContact", "func": _process_wocontact},
|
|
|
- "H": {"modelName": "WoHand", "func": _process_wohand},
|
|
|
- "s": {"modelName": "WoPresence", "func": _process_wopresence},
|
|
|
- "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
|
|
|
- "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
|
|
|
- "i": {"modelName": "WoSensorTH", "func": _process_wosensorth},
|
|
|
- "g": {"modelName": "WoPlug", "func": _process_woplugmini},
|
|
|
- "u": {"modelName": "WoBulb", "func": _process_color_bulb},
|
|
|
- }
|
|
|
-
|
|
|
- data = {
|
|
|
- "address": device.address,
|
|
|
- "rawAdvData": list(advertisement_data.service_data.values())[0],
|
|
|
- "data": {
|
|
|
- "rssi": device.rssi,
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
- if _model in supported_types:
|
|
|
-
|
|
|
- data.update(
|
|
|
- {
|
|
|
- "isEncrypted": bool(_service_data[0] & 0b10000000),
|
|
|
- "model": _model,
|
|
|
- "modelName": supported_types[_model]["modelName"],
|
|
|
- "data": supported_types[_model]["func"](_service_data, _mfr_data),
|
|
|
- }
|
|
|
- )
|
|
|
-
|
|
|
- data["data"]["rssi"] = device.rssi
|
|
|
-
|
|
|
- return SwitchBotAdvertisement(device.address, data, device)
|
|
|
-
|
|
|
-
|
|
|
-class GetSwitchbotDevices:
|
|
|
- """Scan for all Switchbot devices and return by type."""
|
|
|
-
|
|
|
- def __init__(self, interface: int = 0) -> None:
|
|
|
- """Get switchbot devices class constructor."""
|
|
|
- self._interface = f"hci{interface}"
|
|
|
- self._adv_data: dict[str, SwitchBotAdvertisement] = {}
|
|
|
-
|
|
|
- def detection_callback(
|
|
|
- self,
|
|
|
- device: BLEDevice,
|
|
|
- advertisement_data: AdvertisementData,
|
|
|
- ) -> None:
|
|
|
- discovery = parse_advertisement_data(device, advertisement_data)
|
|
|
- if discovery:
|
|
|
- self._adv_data[discovery.address] = discovery
|
|
|
-
|
|
|
- async def discover(
|
|
|
- self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
|
|
|
- ) -> dict:
|
|
|
- """Find switchbot devices and their advertisement data."""
|
|
|
-
|
|
|
- devices = None
|
|
|
- devices = bleak.BleakScanner(
|
|
|
-
|
|
|
-
|
|
|
- adapter=self._interface,
|
|
|
- )
|
|
|
- devices.register_detection_callback(self.detection_callback)
|
|
|
-
|
|
|
- async with CONNECT_LOCK:
|
|
|
- await devices.start()
|
|
|
- await asyncio.sleep(scan_timeout)
|
|
|
- await devices.stop()
|
|
|
-
|
|
|
- if devices is None:
|
|
|
- if retry < 1:
|
|
|
- _LOGGER.error(
|
|
|
- "Scanning for Switchbot devices failed. Stop trying", exc_info=True
|
|
|
- )
|
|
|
- return self._adv_data
|
|
|
-
|
|
|
- _LOGGER.warning(
|
|
|
- "Error scanning for Switchbot devices. Retrying (remaining: %d)",
|
|
|
- retry,
|
|
|
- )
|
|
|
- await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
|
|
|
- return await self.discover(retry - 1, scan_timeout)
|
|
|
-
|
|
|
- return self._adv_data
|
|
|
-
|
|
|
- async def _get_devices_by_model(
|
|
|
- self,
|
|
|
- model: str,
|
|
|
- ) -> dict:
|
|
|
- """Get switchbot devices by type."""
|
|
|
- if not self._adv_data:
|
|
|
- await self.discover()
|
|
|
-
|
|
|
- return {
|
|
|
- address: adv
|
|
|
- for address, adv in self._adv_data.items()
|
|
|
- if adv.data.get("model") == model
|
|
|
- }
|
|
|
-
|
|
|
- async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
|
|
|
- """Return all WoCurtain/Curtains devices with services data."""
|
|
|
- return await self._get_devices_by_model("c")
|
|
|
-
|
|
|
- async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
|
|
|
- """Return all WoHand/Bot devices with services data."""
|
|
|
- return await self._get_devices_by_model("H")
|
|
|
-
|
|
|
- async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
|
|
|
- """Return all WoSensorTH/Temp sensor devices with services data."""
|
|
|
- base_meters = await self._get_devices_by_model("T")
|
|
|
- plus_meters = await self._get_devices_by_model("i")
|
|
|
- return {**base_meters, **plus_meters}
|
|
|
-
|
|
|
- async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
|
|
|
- """Return all WoContact/Contact sensor devices with services data."""
|
|
|
- return await self._get_devices_by_model("d")
|
|
|
-
|
|
|
- async def get_device_data(
|
|
|
- self, address: str
|
|
|
- ) -> dict[str, SwitchBotAdvertisement] | None:
|
|
|
- """Return data for specific device."""
|
|
|
- if not self._adv_data:
|
|
|
- await self.discover()
|
|
|
-
|
|
|
- _switchbot_data = {
|
|
|
- device: data
|
|
|
- for device, data in self._adv_data.items()
|
|
|
-
|
|
|
- if data.get("address") == address
|
|
|
- }
|
|
|
-
|
|
|
- return _switchbot_data
|
|
|
-
|
|
|
-
|
|
|
-class SwitchbotDevice:
|
|
|
- """Base Representation of a Switchbot Device."""
|
|
|
-
|
|
|
- def __init__(
|
|
|
- self,
|
|
|
- device: BLEDevice,
|
|
|
- password: str | None = None,
|
|
|
- interface: int = 0,
|
|
|
- **kwargs: Any,
|
|
|
- ) -> None:
|
|
|
- """Switchbot base class constructor."""
|
|
|
- self._interface = f"hci{interface}"
|
|
|
- self._device = device
|
|
|
- self._sb_adv_data: SwitchBotAdvertisement | None = None
|
|
|
- self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
|
|
|
- self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
|
|
|
- if password is None or password == "":
|
|
|
- self._password_encoded = None
|
|
|
- else:
|
|
|
- self._password_encoded = "%x" % (
|
|
|
- binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
|
|
|
- )
|
|
|
-
|
|
|
- def _commandkey(self, key: str) -> str:
|
|
|
- """Add password to key if set."""
|
|
|
- if self._password_encoded is None:
|
|
|
- return key
|
|
|
- key_action = key[3]
|
|
|
- key_suffix = key[4:]
|
|
|
- return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
|
|
|
-
|
|
|
- async def _sendcommand(self, key: str, retry: int) -> bytes:
|
|
|
- """Send command to device and read response."""
|
|
|
- command = bytearray.fromhex(self._commandkey(key))
|
|
|
- _LOGGER.debug("Sending command to switchbot %s", command)
|
|
|
- max_attempts = retry + 1
|
|
|
- async with CONNECT_LOCK:
|
|
|
- for attempt in range(max_attempts):
|
|
|
- try:
|
|
|
- return await self._send_command_locked(key, command)
|
|
|
- except (bleak.BleakError, asyncio.exceptions.TimeoutError):
|
|
|
- if attempt == retry:
|
|
|
- _LOGGER.error(
|
|
|
- "Switchbot communication failed. Stopping trying",
|
|
|
- exc_info=True,
|
|
|
- )
|
|
|
- return b"\x00"
|
|
|
-
|
|
|
- _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
|
|
|
-
|
|
|
- raise RuntimeError("Unreachable")
|
|
|
-
|
|
|
- @property
|
|
|
- def name(self) -> str:
|
|
|
- """Return device name."""
|
|
|
- return f"{self._device.name} ({self._device.address})"
|
|
|
-
|
|
|
- async def _send_command_locked(self, key: str, command: bytes) -> bytes:
|
|
|
- """Send command to device and read response."""
|
|
|
- client: BleakClient | None = None
|
|
|
- try:
|
|
|
- _LOGGER.debug("%s: Connnecting to switchbot", self.name)
|
|
|
- client = await establish_connection(
|
|
|
- BleakClient, self._device, self.name, max_attempts=1
|
|
|
- )
|
|
|
- _LOGGER.debug(
|
|
|
- "%s: Connnected to switchbot: %s", self.name, client.is_connected
|
|
|
- )
|
|
|
- future: asyncio.Future[bytearray] = asyncio.Future()
|
|
|
-
|
|
|
- def _notification_handler(sender: int, data: bytearray) -> None:
|
|
|
- """Handle notification responses."""
|
|
|
- if future.done():
|
|
|
- _LOGGER.debug("%s: Notification handler already done", self.name)
|
|
|
- return
|
|
|
- future.set_result(data)
|
|
|
-
|
|
|
- _LOGGER.debug("%s: Subscribe to notifications", self.name)
|
|
|
- await client.start_notify(_sb_uuid(comms_type="rx"), _notification_handler)
|
|
|
-
|
|
|
- _LOGGER.debug("%s: Sending command, %s", self.name, key)
|
|
|
- await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False)
|
|
|
-
|
|
|
- async with async_timeout.timeout(5):
|
|
|
- notify_msg = await future
|
|
|
- _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
|
|
|
-
|
|
|
- _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
|
|
|
- await client.stop_notify(_sb_uuid(comms_type="rx"))
|
|
|
-
|
|
|
- finally:
|
|
|
- if client:
|
|
|
- await client.disconnect()
|
|
|
-
|
|
|
- if notify_msg == b"\x07":
|
|
|
- _LOGGER.error("Password required")
|
|
|
- elif notify_msg == b"\t":
|
|
|
- _LOGGER.error("Password incorrect")
|
|
|
- return notify_msg
|
|
|
-
|
|
|
- def get_address(self) -> str:
|
|
|
- """Return address of device."""
|
|
|
- return self._device.address
|
|
|
-
|
|
|
- def _get_adv_value(self, key: str) -> Any:
|
|
|
- """Return value from advertisement data."""
|
|
|
- if not self._sb_adv_data:
|
|
|
- return None
|
|
|
- return self._sb_adv_data.data["data"][key]
|
|
|
-
|
|
|
- def get_battery_percent(self) -> Any:
|
|
|
- """Return device battery level in percent."""
|
|
|
- return self._get_adv_value("battery")
|
|
|
-
|
|
|
- def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
|
|
|
- """Update device data from advertisement."""
|
|
|
- self._sb_adv_data = advertisement
|
|
|
- self._device = advertisement.device
|
|
|
-
|
|
|
- async def get_device_data(
|
|
|
- self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
|
|
|
- ) -> dict | None:
|
|
|
- """Find switchbot devices and their advertisement data."""
|
|
|
- if interface:
|
|
|
- _interface: int = interface
|
|
|
- else:
|
|
|
- _interface = int(self._interface.replace("hci", ""))
|
|
|
-
|
|
|
- _data = await GetSwitchbotDevices(interface=_interface).discover(
|
|
|
- retry=retry, scan_timeout=self._scan_timeout
|
|
|
- )
|
|
|
-
|
|
|
- if self._device.address in _data:
|
|
|
- self._sb_adv_data = _data[self._device.address]
|
|
|
-
|
|
|
- return self._sb_adv_data
|
|
|
-
|
|
|
-
|
|
|
-class Switchbot(SwitchbotDevice):
|
|
|
- """Representation of a Switchbot."""
|
|
|
-
|
|
|
- def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
- """Switchbot Bot/WoHand constructor."""
|
|
|
- super().__init__(*args, **kwargs)
|
|
|
- self._inverse: bool = kwargs.pop("inverse_mode", False)
|
|
|
- self._settings: dict[str, Any] = {}
|
|
|
-
|
|
|
- async def update(self, interface: int | None = None) -> None:
|
|
|
- """Update mode, battery percent and state of device."""
|
|
|
- await self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
-
|
|
|
- async def turn_on(self) -> bool:
|
|
|
- """Turn device on."""
|
|
|
- result = await self._sendcommand(ON_KEY, self._retry_count)
|
|
|
-
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- if result[0] == 5:
|
|
|
- _LOGGER.debug("Bot is in press mode and doesn't have on state")
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def turn_off(self) -> bool:
|
|
|
- """Turn device off."""
|
|
|
- result = await self._sendcommand(OFF_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- if result[0] == 5:
|
|
|
- _LOGGER.debug("Bot is in press mode and doesn't have off state")
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def hand_up(self) -> bool:
|
|
|
- """Raise device arm."""
|
|
|
- result = await self._sendcommand(UP_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- if result[0] == 5:
|
|
|
- _LOGGER.debug("Bot is in press mode")
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def hand_down(self) -> bool:
|
|
|
- """Lower device arm."""
|
|
|
- result = await self._sendcommand(DOWN_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- if result[0] == 5:
|
|
|
- _LOGGER.debug("Bot is in press mode")
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def press(self) -> bool:
|
|
|
- """Press command to device."""
|
|
|
- result = await self._sendcommand(PRESS_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- if result[0] == 5:
|
|
|
- _LOGGER.debug("Bot is in switch mode")
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def set_switch_mode(
|
|
|
- self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
|
|
|
- ) -> bool:
|
|
|
- """Change bot mode."""
|
|
|
- mode_key = format(switch_mode, "b") + format(inverse, "b")
|
|
|
- strength_key = f"{strength:0{2}x}"
|
|
|
-
|
|
|
- result = await self._sendcommand(
|
|
|
- DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
|
|
|
- )
|
|
|
-
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def set_long_press(self, duration: int = 0) -> bool:
|
|
|
- """Set bot long press duration."""
|
|
|
- duration_key = f"{duration:0{2}x}"
|
|
|
-
|
|
|
- result = await self._sendcommand(
|
|
|
- DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
|
|
|
- )
|
|
|
-
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
- """Get device basic settings."""
|
|
|
- _data = await self._sendcommand(
|
|
|
- key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
|
|
|
- )
|
|
|
-
|
|
|
- if _data in (b"\x07", b"\x00"):
|
|
|
- _LOGGER.error("Unsuccessfull, please try again")
|
|
|
- return None
|
|
|
-
|
|
|
- self._settings = {
|
|
|
- "battery": _data[1],
|
|
|
- "firmware": _data[2] / 10.0,
|
|
|
- "strength": _data[3],
|
|
|
- "timers": _data[8],
|
|
|
- "switchMode": bool(_data[9] & 16),
|
|
|
- "inverseDirection": bool(_data[9] & 1),
|
|
|
- "holdSeconds": _data[10],
|
|
|
- }
|
|
|
-
|
|
|
- return self._settings
|
|
|
-
|
|
|
- def switch_mode(self) -> Any:
|
|
|
- """Return true or false from cache."""
|
|
|
-
|
|
|
- return self._get_adv_value("switchMode")
|
|
|
-
|
|
|
- def is_on(self) -> Any:
|
|
|
- """Return switch state from cache."""
|
|
|
-
|
|
|
- value = self._get_adv_value("isOn")
|
|
|
- if value is None:
|
|
|
- return None
|
|
|
-
|
|
|
- if self._inverse:
|
|
|
- return not value
|
|
|
- return value
|
|
|
-
|
|
|
-
|
|
|
-class SwitchbotCurtain(SwitchbotDevice):
|
|
|
- """Representation of a Switchbot Curtain."""
|
|
|
-
|
|
|
- def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
- """Switchbot Curtain/WoCurtain constructor."""
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- super().__init__(*args, **kwargs)
|
|
|
- self._reverse: bool = kwargs.pop("reverse_mode", True)
|
|
|
- self._settings: dict[str, Any] = {}
|
|
|
- self.ext_info_sum: dict[str, Any] = {}
|
|
|
- self.ext_info_adv: dict[str, Any] = {}
|
|
|
-
|
|
|
- async def open(self) -> bool:
|
|
|
- """Send open command."""
|
|
|
- result = await self._sendcommand(OPEN_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def close(self) -> bool:
|
|
|
- """Send close command."""
|
|
|
- result = await self._sendcommand(CLOSE_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def stop(self) -> bool:
|
|
|
- """Send stop command to device."""
|
|
|
- result = await self._sendcommand(STOP_KEY, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def set_position(self, position: int) -> bool:
|
|
|
- """Send position command (0-100) to device."""
|
|
|
- position = (100 - position) if self._reverse else position
|
|
|
- hex_position = "%0.2X" % position
|
|
|
- result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
|
|
|
- if result[0] == 1:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- async def update(self, interface: int | None = None) -> None:
|
|
|
- """Update position, battery percent and light level of device."""
|
|
|
- await self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
-
|
|
|
- def get_position(self) -> Any:
|
|
|
- """Return cached position (0-100) of Curtain."""
|
|
|
-
|
|
|
- return self._get_adv_value("position")
|
|
|
-
|
|
|
- async def get_basic_info(self) -> dict[str, Any] | None:
|
|
|
- """Get device basic settings."""
|
|
|
- _data = await self._sendcommand(
|
|
|
- key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
|
|
|
- )
|
|
|
-
|
|
|
- if _data in (b"\x07", b"\x00"):
|
|
|
- _LOGGER.error("Unsuccessfull, please try again")
|
|
|
- return None
|
|
|
-
|
|
|
- _position = max(min(_data[6], 100), 0)
|
|
|
-
|
|
|
- self._settings = {
|
|
|
- "battery": _data[1],
|
|
|
- "firmware": _data[2] / 10.0,
|
|
|
- "chainLength": _data[3],
|
|
|
- "openDirection": (
|
|
|
- "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
|
|
|
- ),
|
|
|
- "touchToOpen": bool(_data[4] & 0b01000000),
|
|
|
- "light": bool(_data[4] & 0b00100000),
|
|
|
- "fault": bool(_data[4] & 0b00001000),
|
|
|
- "solarPanel": bool(_data[5] & 0b00001000),
|
|
|
- "calibrated": bool(_data[5] & 0b00000100),
|
|
|
- "inMotion": bool(_data[5] & 0b01000011),
|
|
|
- "position": (100 - _position) if self._reverse else _position,
|
|
|
- "timers": _data[7],
|
|
|
- }
|
|
|
-
|
|
|
- return self._settings
|
|
|
-
|
|
|
- async def get_extended_info_summary(self) -> dict[str, Any] | None:
|
|
|
- """Get basic info for all devices in chain."""
|
|
|
- _data = await self._sendcommand(
|
|
|
- key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
|
|
|
- )
|
|
|
-
|
|
|
- if _data in (b"\x07", b"\x00"):
|
|
|
- _LOGGER.error("Unsuccessfull, please try again")
|
|
|
- return None
|
|
|
-
|
|
|
- self.ext_info_sum["device0"] = {
|
|
|
- "openDirectionDefault": not bool(_data[1] & 0b10000000),
|
|
|
- "touchToOpen": bool(_data[1] & 0b01000000),
|
|
|
- "light": bool(_data[1] & 0b00100000),
|
|
|
- "openDirection": (
|
|
|
- "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
|
|
|
- ),
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if _data[2] != 0:
|
|
|
- self.ext_info_sum["device1"] = {
|
|
|
- "openDirectionDefault": not bool(_data[1] & 0b10000000),
|
|
|
- "touchToOpen": bool(_data[1] & 0b01000000),
|
|
|
- "light": bool(_data[1] & 0b00100000),
|
|
|
- "openDirection": (
|
|
|
- "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
|
|
|
- ),
|
|
|
- }
|
|
|
-
|
|
|
- return self.ext_info_sum
|
|
|
-
|
|
|
- async def get_extended_info_adv(self) -> dict[str, Any] | None:
|
|
|
- """Get advance page info for device chain."""
|
|
|
-
|
|
|
- _data = await self._sendcommand(
|
|
|
- key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
|
|
|
- )
|
|
|
-
|
|
|
- if _data in (b"\x07", b"\x00"):
|
|
|
- _LOGGER.error("Unsuccessfull, please try again")
|
|
|
- return None
|
|
|
-
|
|
|
- _state_of_charge = [
|
|
|
- "not_charging",
|
|
|
- "charging_by_adapter",
|
|
|
- "charging_by_solar",
|
|
|
- "fully_charged",
|
|
|
- "solar_not_charging",
|
|
|
- "charging_error",
|
|
|
- ]
|
|
|
-
|
|
|
- self.ext_info_adv["device0"] = {
|
|
|
- "battery": _data[1],
|
|
|
- "firmware": _data[2] / 10.0,
|
|
|
- "stateOfCharge": _state_of_charge[_data[3]],
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if _data[4]:
|
|
|
- self.ext_info_adv["device1"] = {
|
|
|
- "battery": _data[4],
|
|
|
- "firmware": _data[5] / 10.0,
|
|
|
- "stateOfCharge": _state_of_charge[_data[6]],
|
|
|
- }
|
|
|
-
|
|
|
- return self.ext_info_adv
|
|
|
-
|
|
|
- def get_light_level(self) -> Any:
|
|
|
- """Return cached light level."""
|
|
|
-
|
|
|
- return self._get_adv_value("lightLevel")
|
|
|
-
|
|
|
- def is_reversed(self) -> bool:
|
|
|
- """Return True if curtain position is opposite from SB data."""
|
|
|
- return self._reverse
|
|
|
-
|
|
|
- def is_calibrated(self) -> Any:
|
|
|
- """Return True curtain is calibrated."""
|
|
|
-
|
|
|
- return self._get_adv_value("calibration")
|
|
|
-
|
|
|
-
|
|
|
-class SwitchbotPlugMini(SwitchbotDevice):
|
|
|
- """Representation of a Switchbot plug mini."""
|
|
|
-
|
|
|
- def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
- """Switchbot plug mini constructor."""
|
|
|
- super().__init__(*args, **kwargs)
|
|
|
- self._settings: dict[str, Any] = {}
|
|
|
-
|
|
|
- async def update(self, interface: int | None = None) -> None:
|
|
|
- """Update state of device."""
|
|
|
- await self.get_device_data(retry=self._retry_count, interface=interface)
|
|
|
-
|
|
|
- async def turn_on(self) -> bool:
|
|
|
- """Turn device on."""
|
|
|
- result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
|
|
|
- return result[1] == 0x80
|
|
|
-
|
|
|
- async def turn_off(self) -> bool:
|
|
|
- """Turn device off."""
|
|
|
- result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
|
|
|
- return result[1] == 0x00
|
|
|
-
|
|
|
- def is_on(self) -> Any:
|
|
|
- """Return switch state from cache."""
|
|
|
-
|
|
|
- value = self._get_adv_value("isOn")
|
|
|
- if value is None:
|
|
|
- return None
|
|
|
- return value
|
|
|
+from .adv_parser import parse_advertisement_data
|
|
|
+from .devices.bot import Switchbot
|
|
|
+from .devices.curtain import SwitchbotCurtain
|
|
|
+from .devices.device import SwitchbotDevice
|
|
|
+from .devices.plug import SwitchbotPlugMini
|
|
|
+from .discovery import GetSwitchbotDevices
|
|
|
+from .models import SwitchBotAdvertisement
|
|
|
+
|
|
|
+__all__ = [
|
|
|
+ "parse_advertisement_data",
|
|
|
+ "GetSwitchbotDevices",
|
|
|
+ "SwitchBotAdvertisement",
|
|
|
+ "SwitchbotDevice",
|
|
|
+ "SwitchbotCurtain",
|
|
|
+ "Switchbot",
|
|
|
+ "SwitchbotPlugMini",
|
|
|
+]
|