"""Library to handle connection with Switchbot.""" from __future__ import annotations import asyncio import binascii import logging from dataclasses import dataclass from typing import Any from uuid import UUID import async_timeout import bleak from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData from bleak_retry_connector import BleakClient, establish_connection DEFAULT_RETRY_COUNT = 3 DEFAULT_RETRY_TIMEOUT = 1 DEFAULT_SCAN_TIMEOUT = 5 # Keys common to all device types DEVICE_GET_BASIC_SETTINGS_KEY = "5702" DEVICE_SET_MODE_KEY = "5703" DEVICE_SET_EXTENDED_KEY = "570f" # Plug Mini keys PLUG_ON_KEY = "570f50010180" PLUG_OFF_KEY = "570f50010100" # Bot keys PRESS_KEY = "570100" ON_KEY = "570101" OFF_KEY = "570102" DOWN_KEY = "570103" UP_KEY = "570104" # Curtain keys OPEN_KEY = "570f450105ff00" # 570F4501010100 CLOSE_KEY = "570f450105ff64" # 570F4501010164 POSITION_KEY = "570F450105ff" # +actual_position ex: 570F450105ff32 for 50% STOP_KEY = "570F450100ff" CURTAIN_EXT_SUM_KEY = "570f460401" CURTAIN_EXT_ADV_KEY = "570f460402" CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101" # Base key when encryption is set KEY_PASSWORD_PREFIX = "571" _LOGGER = logging.getLogger(__name__) CONNECT_LOCK = asyncio.Lock() def _sb_uuid(comms_type: str = "service") -> UUID | str: """Return Switchbot UUID.""" _uuid = {"tx": "002", "rx": "003", "service": "d00"} if comms_type in _uuid: return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b") return "Incorrect type, choose between: tx, rx or service" def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: """Process woHand/Bot services data.""" _switch_mode = bool(data[1] & 0b10000000) _bot_data = { "switchMode": _switch_mode, "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False, "battery": data[2] & 0b01111111, } return _bot_data def _process_wocurtain( data: bytes, mfr_data: bytes | None, reverse: bool = True ) -> dict[str, bool | int]: """Process woCurtain/Curtain services data.""" _position = max(min(data[3] & 0b01111111, 100), 0) _curtain_data = { "calibration": bool(data[1] & 0b01000000), "battery": data[2] & 0b01111111, "inMotion": bool(data[3] & 0b10000000), "position": (100 - _position) if reverse else _position, "lightLevel": (data[4] >> 4) & 0b00001111, "deviceChain": data[4] & 0b00000111, } return _curtain_data def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]: """Process woSensorTH/Temp sensor services data.""" _temp_sign = 1 if data[4] & 0b10000000 else -1 _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10)) _temp_f = (_temp_c * 9 / 5) + 32 _temp_f = (_temp_f * 10) / 10 _wosensorth_data = { "temp": {"c": _temp_c, "f": _temp_f}, "fahrenheit": bool(data[5] & 0b10000000), "humidity": data[5] & 0b01111111, "battery": data[2] & 0b01111111, } return _wosensorth_data def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: """Process woContact Sensor services data.""" return { "tested": bool(data[1] & 0b10000000), "motion_detected": bool(data[1] & 0b01000000), "battery": data[2] & 0b01111111, "contact_open": data[3] & 0b00000010 == 0b00000010, "contact_timeout": data[3] & 0b00000110 == 0b00000110, "is_light": bool(data[3] & 0b00000001), "button_count": (data[7] & 0b11110000) >> 4, } def _process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: """Process WoPresence Sensor services data.""" return { "tested": bool(data[1] & 0b10000000), "motion_detected": bool(data[1] & 0b01000000), "battery": data[2] & 0b01111111, "led": (data[5] & 0b00100000) >> 5, "iot": (data[5] & 0b00010000) >> 4, "sense_distance": (data[5] & 0b00001100) >> 2, "light_intensity": data[5] & 0b00000011, "is_light": bool(data[5] & 0b00000010), } def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: """Process plug mini.""" return { "switchMode": True, "isOn": mfr_data[7] == 0x80, "wifi_rssi": -mfr_data[9], } def _process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: """Process WoBulb services data.""" return { "sequence_number": mfr_data[6], "isOn": bool(mfr_data[7] & 0b10000000), "brightness": mfr_data[7] & 0b01111111, "delay": bool(mfr_data[8] & 0b10000000), "preset": bool(mfr_data[8] & 0b00001000), "light_state": mfr_data[8] & 0b00000111, "speed": mfr_data[9] & 0b01111111, "loop_index": mfr_data[10] & 0b11111110, } @dataclass class SwitchBotAdvertisement: """Switchbot advertisement.""" address: str data: dict[str, Any] device: BLEDevice def parse_advertisement_data( device: BLEDevice, advertisement_data: AdvertisementData ) -> SwitchBotAdvertisement | None: """Parse advertisement data.""" _services = list(advertisement_data.service_data.values()) _mgr_datas = list(advertisement_data.manufacturer_data.values()) if not _services: return _service_data = _services[0] _mfr_data = _mgr_datas[0] if _mgr_datas else None _model = chr(_service_data[0] & 0b01111111) supported_types: dict[str, dict[str, Any]] = { "d": {"modelName": "WoContact", "func": _process_wocontact}, "H": {"modelName": "WoHand", "func": _process_wohand}, "s": {"modelName": "WoPresence", "func": _process_wopresence}, "c": {"modelName": "WoCurtain", "func": _process_wocurtain}, "T": {"modelName": "WoSensorTH", "func": _process_wosensorth}, "i": {"modelName": "WoSensorTH", "func": _process_wosensorth}, "g": {"modelName": "WoPlug", "func": _process_woplugmini}, "u": {"modelName": "WoBulb", "func": _process_color_bulb}, } data = { "address": device.address, # MacOS uses UUIDs "rawAdvData": list(advertisement_data.service_data.values())[0], "data": { "rssi": device.rssi, }, } if _model in supported_types: data.update( { "isEncrypted": bool(_service_data[0] & 0b10000000), "model": _model, "modelName": supported_types[_model]["modelName"], "data": supported_types[_model]["func"](_service_data, _mfr_data), } ) data["data"]["rssi"] = device.rssi return SwitchBotAdvertisement(device.address, data, device) class GetSwitchbotDevices: """Scan for all Switchbot devices and return by type.""" def __init__(self, interface: int = 0) -> None: """Get switchbot devices class constructor.""" self._interface = f"hci{interface}" self._adv_data: dict[str, SwitchBotAdvertisement] = {} def detection_callback( self, device: BLEDevice, advertisement_data: AdvertisementData, ) -> None: discovery = parse_advertisement_data(device, advertisement_data) if discovery: self._adv_data[discovery.address] = discovery async def discover( self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT ) -> dict: """Find switchbot devices and their advertisement data.""" devices = None devices = bleak.BleakScanner( # TODO: Find new UUIDs to filter on. For example, see # https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md adapter=self._interface, ) devices.register_detection_callback(self.detection_callback) async with CONNECT_LOCK: await devices.start() await asyncio.sleep(scan_timeout) await devices.stop() if devices is None: if retry < 1: _LOGGER.error( "Scanning for Switchbot devices failed. Stop trying", exc_info=True ) return self._adv_data _LOGGER.warning( "Error scanning for Switchbot devices. Retrying (remaining: %d)", retry, ) await asyncio.sleep(DEFAULT_RETRY_TIMEOUT) return await self.discover(retry - 1, scan_timeout) return self._adv_data async def _get_devices_by_model( self, model: str, ) -> dict: """Get switchbot devices by type.""" if not self._adv_data: await self.discover() return { address: adv for address, adv in self._adv_data.items() if adv.data.get("model") == model } async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]: """Return all WoCurtain/Curtains devices with services data.""" return await self._get_devices_by_model("c") async def get_bots(self) -> dict[str, SwitchBotAdvertisement]: """Return all WoHand/Bot devices with services data.""" return await self._get_devices_by_model("H") async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]: """Return all WoSensorTH/Temp sensor devices with services data.""" base_meters = await self._get_devices_by_model("T") plus_meters = await self._get_devices_by_model("i") return {**base_meters, **plus_meters} async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]: """Return all WoContact/Contact sensor devices with services data.""" return await self._get_devices_by_model("d") async def get_device_data( self, address: str ) -> dict[str, SwitchBotAdvertisement] | None: """Return data for specific device.""" if not self._adv_data: await self.discover() _switchbot_data = { device: data for device, data in self._adv_data.items() # MacOS uses UUIDs instead of MAC addresses if data.get("address") == address } return _switchbot_data class SwitchbotDevice: """Base Representation of a Switchbot Device.""" def __init__( self, device: BLEDevice, password: str | None = None, interface: int = 0, **kwargs: Any, ) -> None: """Switchbot base class constructor.""" self._interface = f"hci{interface}" self._device = device self._sb_adv_data: SwitchBotAdvertisement | None = None self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT) self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT) if password is None or password == "": self._password_encoded = None else: self._password_encoded = "%x" % ( binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF ) def _commandkey(self, key: str) -> str: """Add password to key if set.""" if self._password_encoded is None: return key key_action = key[3] key_suffix = key[4:] return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix async def _sendcommand(self, key: str, retry: int) -> bytes: """Send command to device and read response.""" command = bytearray.fromhex(self._commandkey(key)) _LOGGER.debug("Sending command to switchbot %s", command) max_attempts = retry + 1 async with CONNECT_LOCK: for attempt in range(max_attempts): try: return await self._send_command_locked(key, command) except (bleak.BleakError, asyncio.exceptions.TimeoutError): if attempt == retry: _LOGGER.error( "Switchbot communication failed. Stopping trying", exc_info=True, ) return b"\x00" _LOGGER.debug("Switchbot communication failed with:", exc_info=True) raise RuntimeError("Unreachable") @property def name(self) -> str: """Return device name.""" return f"{self._device.name} ({self._device.address})" async def _send_command_locked(self, key: str, command: bytes) -> bytes: """Send command to device and read response.""" client: BleakClient | None = None try: _LOGGER.debug("%s: Connnecting to switchbot", self.name) client = await establish_connection( BleakClient, self._device, self.name, max_attempts=1 ) _LOGGER.debug( "%s: Connnected to switchbot: %s", self.name, client.is_connected ) future: asyncio.Future[bytearray] = asyncio.Future() def _notification_handler(sender: int, data: bytearray) -> None: """Handle notification responses.""" if future.done(): _LOGGER.debug("%s: Notification handler already done", self.name) return future.set_result(data) _LOGGER.debug("%s: Subscribe to notifications", self.name) await client.start_notify(_sb_uuid(comms_type="rx"), _notification_handler) _LOGGER.debug("%s: Sending command, %s", self.name, key) await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False) async with async_timeout.timeout(5): notify_msg = await future _LOGGER.info("%s: Notification received: %s", self.name, notify_msg) _LOGGER.debug("%s: UnSubscribe to notifications", self.name) await client.stop_notify(_sb_uuid(comms_type="rx")) finally: if client: await client.disconnect() if notify_msg == b"\x07": _LOGGER.error("Password required") elif notify_msg == b"\t": _LOGGER.error("Password incorrect") return notify_msg def get_address(self) -> str: """Return address of device.""" return self._device.address def _get_adv_value(self, key: str) -> Any: """Return value from advertisement data.""" if not self._sb_adv_data: return None return self._sb_adv_data.data["data"][key] def get_battery_percent(self) -> Any: """Return device battery level in percent.""" return self._get_adv_value("battery") def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None: """Update device data from advertisement.""" self._sb_adv_data = advertisement self._device = advertisement.device async def get_device_data( self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None ) -> dict | None: """Find switchbot devices and their advertisement data.""" if interface: _interface: int = interface else: _interface = int(self._interface.replace("hci", "")) _data = await GetSwitchbotDevices(interface=_interface).discover( retry=retry, scan_timeout=self._scan_timeout ) if self._device.address in _data: self._sb_adv_data = _data[self._device.address] return self._sb_adv_data class Switchbot(SwitchbotDevice): """Representation of a Switchbot.""" def __init__(self, *args: Any, **kwargs: Any) -> None: """Switchbot Bot/WoHand constructor.""" super().__init__(*args, **kwargs) self._inverse: bool = kwargs.pop("inverse_mode", False) self._settings: dict[str, Any] = {} async def update(self, interface: int | None = None) -> None: """Update mode, battery percent and state of device.""" await self.get_device_data(retry=self._retry_count, interface=interface) async def turn_on(self) -> bool: """Turn device on.""" result = await self._sendcommand(ON_KEY, self._retry_count) if result[0] == 1: return True if result[0] == 5: _LOGGER.debug("Bot is in press mode and doesn't have on state") return True return False async def turn_off(self) -> bool: """Turn device off.""" result = await self._sendcommand(OFF_KEY, self._retry_count) if result[0] == 1: return True if result[0] == 5: _LOGGER.debug("Bot is in press mode and doesn't have off state") return True return False async def hand_up(self) -> bool: """Raise device arm.""" result = await self._sendcommand(UP_KEY, self._retry_count) if result[0] == 1: return True if result[0] == 5: _LOGGER.debug("Bot is in press mode") return True return False async def hand_down(self) -> bool: """Lower device arm.""" result = await self._sendcommand(DOWN_KEY, self._retry_count) if result[0] == 1: return True if result[0] == 5: _LOGGER.debug("Bot is in press mode") return True return False async def press(self) -> bool: """Press command to device.""" result = await self._sendcommand(PRESS_KEY, self._retry_count) if result[0] == 1: return True if result[0] == 5: _LOGGER.debug("Bot is in switch mode") return True return False async def set_switch_mode( self, switch_mode: bool = False, strength: int = 100, inverse: bool = False ) -> bool: """Change bot mode.""" mode_key = format(switch_mode, "b") + format(inverse, "b") strength_key = f"{strength:0{2}x}" # to hex with padding to double digit result = await self._sendcommand( DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count ) if result[0] == 1: return True return False async def set_long_press(self, duration: int = 0) -> bool: """Set bot long press duration.""" duration_key = f"{duration:0{2}x}" # to hex with padding to double digit result = await self._sendcommand( DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count ) if result[0] == 1: return True return False async def get_basic_info(self) -> dict[str, Any] | None: """Get device basic settings.""" _data = await self._sendcommand( key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count ) if _data in (b"\x07", b"\x00"): _LOGGER.error("Unsuccessfull, please try again") return None self._settings = { "battery": _data[1], "firmware": _data[2] / 10.0, "strength": _data[3], "timers": _data[8], "switchMode": bool(_data[9] & 16), "inverseDirection": bool(_data[9] & 1), "holdSeconds": _data[10], } return self._settings def switch_mode(self) -> Any: """Return true or false from cache.""" # To get actual position call update() first. return self._get_adv_value("switchMode") def is_on(self) -> Any: """Return switch state from cache.""" # To get actual position call update() first. value = self._get_adv_value("isOn") if value is None: return None if self._inverse: return not value return value class SwitchbotCurtain(SwitchbotDevice): """Representation of a Switchbot Curtain.""" def __init__(self, *args: Any, **kwargs: Any) -> None: """Switchbot Curtain/WoCurtain constructor.""" # The position of the curtain is saved returned with 0 = open and 100 = closed. # This is independent of the calibration of the curtain bot (Open left to right/ # Open right to left/Open from the middle). # The parameter 'reverse_mode' reverse these values, # if 'reverse_mode' = True, position = 0 equals close # and position = 100 equals open. The parameter is default set to True so that # the definition of position is the same as in Home Assistant. super().__init__(*args, **kwargs) self._reverse: bool = kwargs.pop("reverse_mode", True) self._settings: dict[str, Any] = {} self.ext_info_sum: dict[str, Any] = {} self.ext_info_adv: dict[str, Any] = {} async def open(self) -> bool: """Send open command.""" result = await self._sendcommand(OPEN_KEY, self._retry_count) if result[0] == 1: return True return False async def close(self) -> bool: """Send close command.""" result = await self._sendcommand(CLOSE_KEY, self._retry_count) if result[0] == 1: return True return False async def stop(self) -> bool: """Send stop command to device.""" result = await self._sendcommand(STOP_KEY, self._retry_count) if result[0] == 1: return True return False async def set_position(self, position: int) -> bool: """Send position command (0-100) to device.""" position = (100 - position) if self._reverse else position hex_position = "%0.2X" % position result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count) if result[0] == 1: return True return False async def update(self, interface: int | None = None) -> None: """Update position, battery percent and light level of device.""" await self.get_device_data(retry=self._retry_count, interface=interface) def get_position(self) -> Any: """Return cached position (0-100) of Curtain.""" # To get actual position call update() first. return self._get_adv_value("position") async def get_basic_info(self) -> dict[str, Any] | None: """Get device basic settings.""" _data = await self._sendcommand( key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count ) if _data in (b"\x07", b"\x00"): _LOGGER.error("Unsuccessfull, please try again") return None _position = max(min(_data[6], 100), 0) self._settings = { "battery": _data[1], "firmware": _data[2] / 10.0, "chainLength": _data[3], "openDirection": ( "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right" ), "touchToOpen": bool(_data[4] & 0b01000000), "light": bool(_data[4] & 0b00100000), "fault": bool(_data[4] & 0b00001000), "solarPanel": bool(_data[5] & 0b00001000), "calibrated": bool(_data[5] & 0b00000100), "inMotion": bool(_data[5] & 0b01000011), "position": (100 - _position) if self._reverse else _position, "timers": _data[7], } return self._settings async def get_extended_info_summary(self) -> dict[str, Any] | None: """Get basic info for all devices in chain.""" _data = await self._sendcommand( key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count ) if _data in (b"\x07", b"\x00"): _LOGGER.error("Unsuccessfull, please try again") return None self.ext_info_sum["device0"] = { "openDirectionDefault": not bool(_data[1] & 0b10000000), "touchToOpen": bool(_data[1] & 0b01000000), "light": bool(_data[1] & 0b00100000), "openDirection": ( "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left" ), } # if grouped curtain device present. if _data[2] != 0: self.ext_info_sum["device1"] = { "openDirectionDefault": not bool(_data[1] & 0b10000000), "touchToOpen": bool(_data[1] & 0b01000000), "light": bool(_data[1] & 0b00100000), "openDirection": ( "left_to_right" if _data[1] & 0b00010000 else "right_to_left" ), } return self.ext_info_sum async def get_extended_info_adv(self) -> dict[str, Any] | None: """Get advance page info for device chain.""" _data = await self._sendcommand( key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count ) if _data in (b"\x07", b"\x00"): _LOGGER.error("Unsuccessfull, please try again") return None _state_of_charge = [ "not_charging", "charging_by_adapter", "charging_by_solar", "fully_charged", "solar_not_charging", "charging_error", ] self.ext_info_adv["device0"] = { "battery": _data[1], "firmware": _data[2] / 10.0, "stateOfCharge": _state_of_charge[_data[3]], } # If grouped curtain device present. if _data[4]: self.ext_info_adv["device1"] = { "battery": _data[4], "firmware": _data[5] / 10.0, "stateOfCharge": _state_of_charge[_data[6]], } return self.ext_info_adv def get_light_level(self) -> Any: """Return cached light level.""" # To get actual light level call update() first. return self._get_adv_value("lightLevel") def is_reversed(self) -> bool: """Return True if curtain position is opposite from SB data.""" return self._reverse def is_calibrated(self) -> Any: """Return True curtain is calibrated.""" # To get actual light level call update() first. return self._get_adv_value("calibration") class SwitchbotPlugMini(SwitchbotDevice): """Representation of a Switchbot plug mini.""" def __init__(self, *args: Any, **kwargs: Any) -> None: """Switchbot plug mini constructor.""" super().__init__(*args, **kwargs) self._settings: dict[str, Any] = {} async def update(self, interface: int | None = None) -> None: """Update state of device.""" await self.get_device_data(retry=self._retry_count, interface=interface) async def turn_on(self) -> bool: """Turn device on.""" result = await self._sendcommand(PLUG_ON_KEY, self._retry_count) return result[1] == 0x80 async def turn_off(self) -> bool: """Turn device off.""" result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count) return result[1] == 0x00 def is_on(self) -> Any: """Return switch state from cache.""" # To get actual position call update() first. value = self._get_adv_value("isOn") if value is None: return None return value