Browse Source

Refactor to make it easier to add new devices (#61)

J. Nick Koston 1 year ago
parent
commit
aca522f0dc

+ 16 - 16
setup.py

@@ -1,21 +1,21 @@
 from setuptools import setup
 
 setup(
-    name = 'PySwitchbot',
-    packages = ['switchbot'],
-    install_requires=['async_timeout>=4.0.1', 'bleak', 'bleak-retry-connector>=1.2.0'],
-    version = '0.17.3',
-    description = 'A library to communicate with Switchbot',
-    author='Daniel Hjelseth Hoyer',
-    url='https://github.com/Danielhiversen/pySwitchbot/',
-    license='MIT',
+    name="PySwitchbot",
+    packages=["switchbot", "switchbot.devices", "switchbot.adv_parsers"],
+    install_requires=["async_timeout>=4.0.1", "bleak", "bleak-retry-connector>=1.2.0"],
+    version="0.17.3",
+    description="A library to communicate with Switchbot",
+    author="Daniel Hjelseth Hoyer",
+    url="https://github.com/Danielhiversen/pySwitchbot/",
+    license="MIT",
     classifiers=[
-        'Development Status :: 3 - Alpha',
-        'Environment :: Other Environment',
-        'Intended Audience :: Developers',
-        'Operating System :: OS Independent',
-        'Programming Language :: Python',
-        'Topic :: Home Automation',
-        'Topic :: Software Development :: Libraries :: Python Modules'
-        ]
+        "Development Status :: 3 - Alpha",
+        "Environment :: Other Environment",
+        "Intended Audience :: Developers",
+        "Operating System :: OS Independent",
+        "Programming Language :: Python",
+        "Topic :: Home Automation",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+    ],
 )

+ 17 - 803
switchbot/__init__.py

@@ -1,806 +1,20 @@
 """Library to handle connection with Switchbot."""
 from __future__ import annotations
 
-import asyncio
-import binascii
-import logging
-from dataclasses import dataclass
-from typing import Any
-from uuid import UUID
-
-import async_timeout
-import bleak
-from bleak.backends.device import BLEDevice
-from bleak.backends.scanner import AdvertisementData
-from bleak_retry_connector import BleakClient, establish_connection
-
-DEFAULT_RETRY_COUNT = 3
-DEFAULT_RETRY_TIMEOUT = 1
-DEFAULT_SCAN_TIMEOUT = 5
-
-# Keys common to all device types
-DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
-DEVICE_SET_MODE_KEY = "5703"
-DEVICE_SET_EXTENDED_KEY = "570f"
-
-# Plug Mini keys
-PLUG_ON_KEY = "570f50010180"
-PLUG_OFF_KEY = "570f50010100"
-
-# Bot keys
-PRESS_KEY = "570100"
-ON_KEY = "570101"
-OFF_KEY = "570102"
-DOWN_KEY = "570103"
-UP_KEY = "570104"
-
-# Curtain keys
-OPEN_KEY = "570f450105ff00"  # 570F4501010100
-CLOSE_KEY = "570f450105ff64"  # 570F4501010164
-POSITION_KEY = "570F450105ff"  # +actual_position ex: 570F450105ff32 for 50%
-STOP_KEY = "570F450100ff"
-CURTAIN_EXT_SUM_KEY = "570f460401"
-CURTAIN_EXT_ADV_KEY = "570f460402"
-CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
-
-# Base key when encryption is set
-KEY_PASSWORD_PREFIX = "571"
-
-_LOGGER = logging.getLogger(__name__)
-CONNECT_LOCK = asyncio.Lock()
-
-
-def _sb_uuid(comms_type: str = "service") -> UUID | str:
-    """Return Switchbot UUID."""
-
-    _uuid = {"tx": "002", "rx": "003", "service": "d00"}
-
-    if comms_type in _uuid:
-        return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
-
-    return "Incorrect type, choose between: tx, rx or service"
-
-
-def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
-    """Process woHand/Bot services data."""
-    _switch_mode = bool(data[1] & 0b10000000)
-
-    _bot_data = {
-        "switchMode": _switch_mode,
-        "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
-        "battery": data[2] & 0b01111111,
-    }
-
-    return _bot_data
-
-
-def _process_wocurtain(
-    data: bytes, mfr_data: bytes | None, reverse: bool = True
-) -> dict[str, bool | int]:
-    """Process woCurtain/Curtain services data."""
-
-    _position = max(min(data[3] & 0b01111111, 100), 0)
-
-    _curtain_data = {
-        "calibration": bool(data[1] & 0b01000000),
-        "battery": data[2] & 0b01111111,
-        "inMotion": bool(data[3] & 0b10000000),
-        "position": (100 - _position) if reverse else _position,
-        "lightLevel": (data[4] >> 4) & 0b00001111,
-        "deviceChain": data[4] & 0b00000111,
-    }
-
-    return _curtain_data
-
-
-def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
-    """Process woSensorTH/Temp sensor services data."""
-
-    _temp_sign = 1 if data[4] & 0b10000000 else -1
-    _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
-    _temp_f = (_temp_c * 9 / 5) + 32
-    _temp_f = (_temp_f * 10) / 10
-
-    _wosensorth_data = {
-        "temp": {"c": _temp_c, "f": _temp_f},
-        "fahrenheit": bool(data[5] & 0b10000000),
-        "humidity": data[5] & 0b01111111,
-        "battery": data[2] & 0b01111111,
-    }
-
-    return _wosensorth_data
-
-
-def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
-    """Process woContact Sensor services data."""
-    return {
-        "tested": bool(data[1] & 0b10000000),
-        "motion_detected": bool(data[1] & 0b01000000),
-        "battery": data[2] & 0b01111111,
-        "contact_open": data[3] & 0b00000010 == 0b00000010,
-        "contact_timeout": data[3] & 0b00000110 == 0b00000110,
-        "is_light": bool(data[3] & 0b00000001),
-        "button_count": (data[7] & 0b11110000) >> 4,
-    }
-
-
-def _process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
-    """Process WoPresence Sensor services data."""
-    return {
-        "tested": bool(data[1] & 0b10000000),
-        "motion_detected": bool(data[1] & 0b01000000),
-        "battery": data[2] & 0b01111111,
-        "led": (data[5] & 0b00100000) >> 5,
-        "iot": (data[5] & 0b00010000) >> 4,
-        "sense_distance": (data[5] & 0b00001100) >> 2,
-        "light_intensity": data[5] & 0b00000011,
-        "is_light": bool(data[5] & 0b00000010),
-    }
-
-
-def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
-    """Process plug mini."""
-    return {
-        "switchMode": True,
-        "isOn": mfr_data[7] == 0x80,
-        "wifi_rssi": -mfr_data[9],
-    }
-
-
-def _process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
-    """Process WoBulb services data."""
-    return {
-        "sequence_number": mfr_data[6],
-        "isOn": bool(mfr_data[7] & 0b10000000),
-        "brightness": mfr_data[7] & 0b01111111,
-        "delay": bool(mfr_data[8] & 0b10000000),
-        "preset": bool(mfr_data[8] & 0b00001000),
-        "light_state": mfr_data[8] & 0b00000111,
-        "speed": mfr_data[9] & 0b01111111,
-        "loop_index": mfr_data[10] & 0b11111110,
-    }
-
-
-@dataclass
-class SwitchBotAdvertisement:
-    """Switchbot advertisement."""
-
-    address: str
-    data: dict[str, Any]
-    device: BLEDevice
-
-
-def parse_advertisement_data(
-    device: BLEDevice, advertisement_data: AdvertisementData
-) -> SwitchBotAdvertisement | None:
-    """Parse advertisement data."""
-    _services = list(advertisement_data.service_data.values())
-    _mgr_datas = list(advertisement_data.manufacturer_data.values())
-
-    if not _services:
-        return
-    _service_data = _services[0]
-    _mfr_data = _mgr_datas[0] if _mgr_datas else None
-    _model = chr(_service_data[0] & 0b01111111)
-
-    supported_types: dict[str, dict[str, Any]] = {
-        "d": {"modelName": "WoContact", "func": _process_wocontact},
-        "H": {"modelName": "WoHand", "func": _process_wohand},
-        "s": {"modelName": "WoPresence", "func": _process_wopresence},
-        "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
-        "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
-        "i": {"modelName": "WoSensorTH", "func": _process_wosensorth},
-        "g": {"modelName": "WoPlug", "func": _process_woplugmini},
-        "u": {"modelName": "WoBulb", "func": _process_color_bulb},
-    }
-
-    data = {
-        "address": device.address,  # MacOS uses UUIDs
-        "rawAdvData": list(advertisement_data.service_data.values())[0],
-        "data": {
-            "rssi": device.rssi,
-        },
-    }
-
-    if _model in supported_types:
-
-        data.update(
-            {
-                "isEncrypted": bool(_service_data[0] & 0b10000000),
-                "model": _model,
-                "modelName": supported_types[_model]["modelName"],
-                "data": supported_types[_model]["func"](_service_data, _mfr_data),
-            }
-        )
-
-        data["data"]["rssi"] = device.rssi
-
-    return SwitchBotAdvertisement(device.address, data, device)
-
-
-class GetSwitchbotDevices:
-    """Scan for all Switchbot devices and return by type."""
-
-    def __init__(self, interface: int = 0) -> None:
-        """Get switchbot devices class constructor."""
-        self._interface = f"hci{interface}"
-        self._adv_data: dict[str, SwitchBotAdvertisement] = {}
-
-    def detection_callback(
-        self,
-        device: BLEDevice,
-        advertisement_data: AdvertisementData,
-    ) -> None:
-        discovery = parse_advertisement_data(device, advertisement_data)
-        if discovery:
-            self._adv_data[discovery.address] = discovery
-
-    async def discover(
-        self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
-    ) -> dict:
-        """Find switchbot devices and their advertisement data."""
-
-        devices = None
-        devices = bleak.BleakScanner(
-            # TODO: Find new UUIDs to filter on. For example, see
-            # https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md
-            adapter=self._interface,
-        )
-        devices.register_detection_callback(self.detection_callback)
-
-        async with CONNECT_LOCK:
-            await devices.start()
-            await asyncio.sleep(scan_timeout)
-            await devices.stop()
-
-        if devices is None:
-            if retry < 1:
-                _LOGGER.error(
-                    "Scanning for Switchbot devices failed. Stop trying", exc_info=True
-                )
-                return self._adv_data
-
-            _LOGGER.warning(
-                "Error scanning for Switchbot devices. Retrying (remaining: %d)",
-                retry,
-            )
-            await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
-            return await self.discover(retry - 1, scan_timeout)
-
-        return self._adv_data
-
-    async def _get_devices_by_model(
-        self,
-        model: str,
-    ) -> dict:
-        """Get switchbot devices by type."""
-        if not self._adv_data:
-            await self.discover()
-
-        return {
-            address: adv
-            for address, adv in self._adv_data.items()
-            if adv.data.get("model") == model
-        }
-
-    async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
-        """Return all WoCurtain/Curtains devices with services data."""
-        return await self._get_devices_by_model("c")
-
-    async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
-        """Return all WoHand/Bot devices with services data."""
-        return await self._get_devices_by_model("H")
-
-    async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
-        """Return all WoSensorTH/Temp sensor devices with services data."""
-        base_meters = await self._get_devices_by_model("T")
-        plus_meters = await self._get_devices_by_model("i")
-        return {**base_meters, **plus_meters}
-
-    async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
-        """Return all WoContact/Contact sensor devices with services data."""
-        return await self._get_devices_by_model("d")
-
-    async def get_device_data(
-        self, address: str
-    ) -> dict[str, SwitchBotAdvertisement] | None:
-        """Return data for specific device."""
-        if not self._adv_data:
-            await self.discover()
-
-        _switchbot_data = {
-            device: data
-            for device, data in self._adv_data.items()
-            # MacOS uses UUIDs instead of MAC addresses
-            if data.get("address") == address
-        }
-
-        return _switchbot_data
-
-
-class SwitchbotDevice:
-    """Base Representation of a Switchbot Device."""
-
-    def __init__(
-        self,
-        device: BLEDevice,
-        password: str | None = None,
-        interface: int = 0,
-        **kwargs: Any,
-    ) -> None:
-        """Switchbot base class constructor."""
-        self._interface = f"hci{interface}"
-        self._device = device
-        self._sb_adv_data: SwitchBotAdvertisement | None = None
-        self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
-        self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
-        if password is None or password == "":
-            self._password_encoded = None
-        else:
-            self._password_encoded = "%x" % (
-                binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
-            )
-
-    def _commandkey(self, key: str) -> str:
-        """Add password to key if set."""
-        if self._password_encoded is None:
-            return key
-        key_action = key[3]
-        key_suffix = key[4:]
-        return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
-
-    async def _sendcommand(self, key: str, retry: int) -> bytes:
-        """Send command to device and read response."""
-        command = bytearray.fromhex(self._commandkey(key))
-        _LOGGER.debug("Sending command to switchbot %s", command)
-        max_attempts = retry + 1
-        async with CONNECT_LOCK:
-            for attempt in range(max_attempts):
-                try:
-                    return await self._send_command_locked(key, command)
-                except (bleak.BleakError, asyncio.exceptions.TimeoutError):
-                    if attempt == retry:
-                        _LOGGER.error(
-                            "Switchbot communication failed. Stopping trying",
-                            exc_info=True,
-                        )
-                        return b"\x00"
-
-                    _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
-
-        raise RuntimeError("Unreachable")
-
-    @property
-    def name(self) -> str:
-        """Return device name."""
-        return f"{self._device.name} ({self._device.address})"
-
-    async def _send_command_locked(self, key: str, command: bytes) -> bytes:
-        """Send command to device and read response."""
-        client: BleakClient | None = None
-        try:
-            _LOGGER.debug("%s: Connnecting to switchbot", self.name)
-            client = await establish_connection(
-                BleakClient, self._device, self.name, max_attempts=1
-            )
-            _LOGGER.debug(
-                "%s: Connnected to switchbot: %s", self.name, client.is_connected
-            )
-            future: asyncio.Future[bytearray] = asyncio.Future()
-
-            def _notification_handler(sender: int, data: bytearray) -> None:
-                """Handle notification responses."""
-                if future.done():
-                    _LOGGER.debug("%s: Notification handler already done", self.name)
-                    return
-                future.set_result(data)
-
-            _LOGGER.debug("%s: Subscribe to notifications", self.name)
-            await client.start_notify(_sb_uuid(comms_type="rx"), _notification_handler)
-
-            _LOGGER.debug("%s: Sending command, %s", self.name, key)
-            await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False)
-
-            async with async_timeout.timeout(5):
-                notify_msg = await future
-            _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
-
-            _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
-            await client.stop_notify(_sb_uuid(comms_type="rx"))
-
-        finally:
-            if client:
-                await client.disconnect()
-
-        if notify_msg == b"\x07":
-            _LOGGER.error("Password required")
-        elif notify_msg == b"\t":
-            _LOGGER.error("Password incorrect")
-        return notify_msg
-
-    def get_address(self) -> str:
-        """Return address of device."""
-        return self._device.address
-
-    def _get_adv_value(self, key: str) -> Any:
-        """Return value from advertisement data."""
-        if not self._sb_adv_data:
-            return None
-        return self._sb_adv_data.data["data"][key]
-
-    def get_battery_percent(self) -> Any:
-        """Return device battery level in percent."""
-        return self._get_adv_value("battery")
-
-    def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
-        """Update device data from advertisement."""
-        self._sb_adv_data = advertisement
-        self._device = advertisement.device
-
-    async def get_device_data(
-        self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
-    ) -> dict | None:
-        """Find switchbot devices and their advertisement data."""
-        if interface:
-            _interface: int = interface
-        else:
-            _interface = int(self._interface.replace("hci", ""))
-
-        _data = await GetSwitchbotDevices(interface=_interface).discover(
-            retry=retry, scan_timeout=self._scan_timeout
-        )
-
-        if self._device.address in _data:
-            self._sb_adv_data = _data[self._device.address]
-
-        return self._sb_adv_data
-
-
-class Switchbot(SwitchbotDevice):
-    """Representation of a Switchbot."""
-
-    def __init__(self, *args: Any, **kwargs: Any) -> None:
-        """Switchbot Bot/WoHand constructor."""
-        super().__init__(*args, **kwargs)
-        self._inverse: bool = kwargs.pop("inverse_mode", False)
-        self._settings: dict[str, Any] = {}
-
-    async def update(self, interface: int | None = None) -> None:
-        """Update mode, battery percent and state of device."""
-        await self.get_device_data(retry=self._retry_count, interface=interface)
-
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._sendcommand(ON_KEY, self._retry_count)
-
-        if result[0] == 1:
-            return True
-
-        if result[0] == 5:
-            _LOGGER.debug("Bot is in press mode and doesn't have on state")
-            return True
-
-        return False
-
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._sendcommand(OFF_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        if result[0] == 5:
-            _LOGGER.debug("Bot is in press mode and doesn't have off state")
-            return True
-
-        return False
-
-    async def hand_up(self) -> bool:
-        """Raise device arm."""
-        result = await self._sendcommand(UP_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        if result[0] == 5:
-            _LOGGER.debug("Bot is in press mode")
-            return True
-
-        return False
-
-    async def hand_down(self) -> bool:
-        """Lower device arm."""
-        result = await self._sendcommand(DOWN_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        if result[0] == 5:
-            _LOGGER.debug("Bot is in press mode")
-            return True
-
-        return False
-
-    async def press(self) -> bool:
-        """Press command to device."""
-        result = await self._sendcommand(PRESS_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        if result[0] == 5:
-            _LOGGER.debug("Bot is in switch mode")
-            return True
-
-        return False
-
-    async def set_switch_mode(
-        self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
-    ) -> bool:
-        """Change bot mode."""
-        mode_key = format(switch_mode, "b") + format(inverse, "b")
-        strength_key = f"{strength:0{2}x}"  # to hex with padding to double digit
-
-        result = await self._sendcommand(
-            DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
-        )
-
-        if result[0] == 1:
-            return True
-
-        return False
-
-    async def set_long_press(self, duration: int = 0) -> bool:
-        """Set bot long press duration."""
-        duration_key = f"{duration:0{2}x}"  # to hex with padding to double digit
-
-        result = await self._sendcommand(
-            DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
-        )
-
-        if result[0] == 1:
-            return True
-
-        return False
-
-    async def get_basic_info(self) -> dict[str, Any] | None:
-        """Get device basic settings."""
-        _data = await self._sendcommand(
-            key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
-        )
-
-        if _data in (b"\x07", b"\x00"):
-            _LOGGER.error("Unsuccessfull, please try again")
-            return None
-
-        self._settings = {
-            "battery": _data[1],
-            "firmware": _data[2] / 10.0,
-            "strength": _data[3],
-            "timers": _data[8],
-            "switchMode": bool(_data[9] & 16),
-            "inverseDirection": bool(_data[9] & 1),
-            "holdSeconds": _data[10],
-        }
-
-        return self._settings
-
-    def switch_mode(self) -> Any:
-        """Return true or false from cache."""
-        # To get actual position call update() first.
-        return self._get_adv_value("switchMode")
-
-    def is_on(self) -> Any:
-        """Return switch state from cache."""
-        # To get actual position call update() first.
-        value = self._get_adv_value("isOn")
-        if value is None:
-            return None
-
-        if self._inverse:
-            return not value
-        return value
-
-
-class SwitchbotCurtain(SwitchbotDevice):
-    """Representation of a Switchbot Curtain."""
-
-    def __init__(self, *args: Any, **kwargs: Any) -> None:
-        """Switchbot Curtain/WoCurtain constructor."""
-
-        # The position of the curtain is saved returned with 0 = open and 100 = closed.
-        # This is independent of the calibration of the curtain bot (Open left to right/
-        # Open right to left/Open from the middle).
-        # The parameter 'reverse_mode' reverse these values,
-        # if 'reverse_mode' = True, position = 0 equals close
-        # and position = 100 equals open. The parameter is default set to True so that
-        # the definition of position is the same as in Home Assistant.
-
-        super().__init__(*args, **kwargs)
-        self._reverse: bool = kwargs.pop("reverse_mode", True)
-        self._settings: dict[str, Any] = {}
-        self.ext_info_sum: dict[str, Any] = {}
-        self.ext_info_adv: dict[str, Any] = {}
-
-    async def open(self) -> bool:
-        """Send open command."""
-        result = await self._sendcommand(OPEN_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
-
-    async def close(self) -> bool:
-        """Send close command."""
-        result = await self._sendcommand(CLOSE_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
-
-    async def stop(self) -> bool:
-        """Send stop command to device."""
-        result = await self._sendcommand(STOP_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
-
-    async def set_position(self, position: int) -> bool:
-        """Send position command (0-100) to device."""
-        position = (100 - position) if self._reverse else position
-        hex_position = "%0.2X" % position
-        result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
-
-    async def update(self, interface: int | None = None) -> None:
-        """Update position, battery percent and light level of device."""
-        await self.get_device_data(retry=self._retry_count, interface=interface)
-
-    def get_position(self) -> Any:
-        """Return cached position (0-100) of Curtain."""
-        # To get actual position call update() first.
-        return self._get_adv_value("position")
-
-    async def get_basic_info(self) -> dict[str, Any] | None:
-        """Get device basic settings."""
-        _data = await self._sendcommand(
-            key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
-        )
-
-        if _data in (b"\x07", b"\x00"):
-            _LOGGER.error("Unsuccessfull, please try again")
-            return None
-
-        _position = max(min(_data[6], 100), 0)
-
-        self._settings = {
-            "battery": _data[1],
-            "firmware": _data[2] / 10.0,
-            "chainLength": _data[3],
-            "openDirection": (
-                "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
-            ),
-            "touchToOpen": bool(_data[4] & 0b01000000),
-            "light": bool(_data[4] & 0b00100000),
-            "fault": bool(_data[4] & 0b00001000),
-            "solarPanel": bool(_data[5] & 0b00001000),
-            "calibrated": bool(_data[5] & 0b00000100),
-            "inMotion": bool(_data[5] & 0b01000011),
-            "position": (100 - _position) if self._reverse else _position,
-            "timers": _data[7],
-        }
-
-        return self._settings
-
-    async def get_extended_info_summary(self) -> dict[str, Any] | None:
-        """Get basic info for all devices in chain."""
-        _data = await self._sendcommand(
-            key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
-        )
-
-        if _data in (b"\x07", b"\x00"):
-            _LOGGER.error("Unsuccessfull, please try again")
-            return None
-
-        self.ext_info_sum["device0"] = {
-            "openDirectionDefault": not bool(_data[1] & 0b10000000),
-            "touchToOpen": bool(_data[1] & 0b01000000),
-            "light": bool(_data[1] & 0b00100000),
-            "openDirection": (
-                "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
-            ),
-        }
-
-        # if grouped curtain device present.
-        if _data[2] != 0:
-            self.ext_info_sum["device1"] = {
-                "openDirectionDefault": not bool(_data[1] & 0b10000000),
-                "touchToOpen": bool(_data[1] & 0b01000000),
-                "light": bool(_data[1] & 0b00100000),
-                "openDirection": (
-                    "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
-                ),
-            }
-
-        return self.ext_info_sum
-
-    async def get_extended_info_adv(self) -> dict[str, Any] | None:
-        """Get advance page info for device chain."""
-
-        _data = await self._sendcommand(
-            key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
-        )
-
-        if _data in (b"\x07", b"\x00"):
-            _LOGGER.error("Unsuccessfull, please try again")
-            return None
-
-        _state_of_charge = [
-            "not_charging",
-            "charging_by_adapter",
-            "charging_by_solar",
-            "fully_charged",
-            "solar_not_charging",
-            "charging_error",
-        ]
-
-        self.ext_info_adv["device0"] = {
-            "battery": _data[1],
-            "firmware": _data[2] / 10.0,
-            "stateOfCharge": _state_of_charge[_data[3]],
-        }
-
-        # If grouped curtain device present.
-        if _data[4]:
-            self.ext_info_adv["device1"] = {
-                "battery": _data[4],
-                "firmware": _data[5] / 10.0,
-                "stateOfCharge": _state_of_charge[_data[6]],
-            }
-
-        return self.ext_info_adv
-
-    def get_light_level(self) -> Any:
-        """Return cached light level."""
-        # To get actual light level call update() first.
-        return self._get_adv_value("lightLevel")
-
-    def is_reversed(self) -> bool:
-        """Return True if curtain position is opposite from SB data."""
-        return self._reverse
-
-    def is_calibrated(self) -> Any:
-        """Return True curtain is calibrated."""
-        # To get actual light level call update() first.
-        return self._get_adv_value("calibration")
-
-
-class SwitchbotPlugMini(SwitchbotDevice):
-    """Representation of a Switchbot plug mini."""
-
-    def __init__(self, *args: Any, **kwargs: Any) -> None:
-        """Switchbot plug mini constructor."""
-        super().__init__(*args, **kwargs)
-        self._settings: dict[str, Any] = {}
-
-    async def update(self, interface: int | None = None) -> None:
-        """Update state of device."""
-        await self.get_device_data(retry=self._retry_count, interface=interface)
-
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
-        return result[1] == 0x80
-
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
-        return result[1] == 0x00
-
-    def is_on(self) -> Any:
-        """Return switch state from cache."""
-        # To get actual position call update() first.
-        value = self._get_adv_value("isOn")
-        if value is None:
-            return None
-        return value
+from .adv_parser import parse_advertisement_data
+from .devices.bot import Switchbot
+from .devices.curtain import SwitchbotCurtain
+from .devices.device import SwitchbotDevice
+from .devices.plug import SwitchbotPlugMini
+from .discovery import GetSwitchbotDevices
+from .models import SwitchBotAdvertisement
+
+__all__ = [
+    "parse_advertisement_data",
+    "GetSwitchbotDevices",
+    "SwitchBotAdvertisement",
+    "SwitchbotDevice",
+    "SwitchbotCurtain",
+    "Switchbot",
+    "SwitchbotPlugMini",
+]

+ 101 - 0
switchbot/adv_parser.py

@@ -0,0 +1,101 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import TypedDict
+
+from bleak.backends.device import BLEDevice
+from bleak.backends.scanner import AdvertisementData
+
+from .adv_parsers.bot import process_wohand
+from .adv_parsers.bulb import process_color_bulb
+from .adv_parsers.contact import process_wocontact
+from .adv_parsers.curtain import process_wocurtain
+from .adv_parsers.meter import process_wosensorth
+from .adv_parsers.motion import process_wopresence
+from .adv_parsers.plug import process_woplugmini
+from .models import SwitchBotAdvertisement
+
+
+class SwitchbotSupportedType(TypedDict):
+    """Supported type of Switchbot."""
+
+    modelName: str
+    modelFriendlyName: str
+    func: Callable[[bytes, bytes | None], dict[str, bool | int]]
+
+
+SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
+    "d": {
+        "modelName": "WoContact",
+        "modelFriendlyName": "Contact Sensor",
+        "func": process_wocontact,
+    },
+    "H": {"modelName": "WoHand", "modelFriendlyName": "Bot", "func": process_wohand},
+    "s": {
+        "modelName": "WoPresence",
+        "modelFriendlyName": "Motion Sensor",
+        "func": process_wopresence,
+    },
+    "c": {
+        "modelName": "WoCurtain",
+        "modelFriendlyName": "Curtain",
+        "func": process_wocurtain,
+    },
+    "T": {
+        "modelName": "WoSensorTH",
+        "modelFriendlyName": "Meter",
+        "func": process_wosensorth,
+    },
+    "i": {
+        "modelName": "WoSensorTH",
+        "modelFriendlyName": "Meter Plus",
+        "func": process_wosensorth,
+    },
+    "g": {
+        "modelName": "WoPlug",
+        "modelFriendlyName": "Plug Mini",
+        "func": process_woplugmini,
+    },
+    "u": {
+        "modelName": "WoBulb",
+        "modelFriendlyName": "Color Bulb",
+        "func": process_color_bulb,
+    },
+}
+
+
+def parse_advertisement_data(
+    device: BLEDevice, advertisement_data: AdvertisementData
+) -> SwitchBotAdvertisement | None:
+    """Parse advertisement data."""
+    _services = list(advertisement_data.service_data.values())
+    _mgr_datas = list(advertisement_data.manufacturer_data.values())
+
+    if not _services:
+        return None
+    _service_data = _services[0]
+    _mfr_data = _mgr_datas[0] if _mgr_datas else None
+    _model = chr(_service_data[0] & 0b01111111)
+
+    data = {
+        "address": device.address,  # MacOS uses UUIDs
+        "rawAdvData": list(advertisement_data.service_data.values())[0],
+        "data": {},
+    }
+
+    type_data = SUPPORTED_TYPES.get(_model)
+    if type_data:
+        data.update(
+            {
+                "isEncrypted": bool(_service_data[0] & 0b10000000),
+                "model": _model,
+                "model_friendly_name": type_data["modelFriendlyName"],
+                "modelName": type_data["modelName"],
+                "data": type_data["func"](_service_data, _mfr_data),
+            }
+        )
+
+    data["data"]["rssi"] = device.rssi
+
+    return SwitchBotAdvertisement(device.address, data, device)

+ 1 - 0
switchbot/adv_parsers/__init__.py

@@ -0,0 +1 @@
+"""Switchbot Advertisement Parser Library."""

+ 13 - 0
switchbot/adv_parsers/bot.py

@@ -0,0 +1,13 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+
+def process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process woHand/Bot services data."""
+    _switch_mode = bool(data[1] & 0b10000000)
+
+    return {
+        "switchMode": _switch_mode,
+        "isOn": not bool(data[1] & 0b01000000) if _switch_mode else False,
+        "battery": data[2] & 0b01111111,
+    }

+ 15 - 0
switchbot/adv_parsers/bulb.py

@@ -0,0 +1,15 @@
+"""Bulb parser."""
+
+
+def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process WoBulb services data."""
+    return {
+        "sequence_number": mfr_data[6],
+        "isOn": bool(mfr_data[7] & 0b10000000),
+        "brightness": mfr_data[7] & 0b01111111,
+        "delay": bool(mfr_data[8] & 0b10000000),
+        "preset": bool(mfr_data[8] & 0b00001000),
+        "light_state": mfr_data[8] & 0b00000111,
+        "speed": mfr_data[9] & 0b01111111,
+        "loop_index": mfr_data[10] & 0b11111110,
+    }

+ 14 - 0
switchbot/adv_parsers/contact.py

@@ -0,0 +1,14 @@
+"""Contact sensor parser."""
+
+
+def process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process woContact Sensor services data."""
+    return {
+        "tested": bool(data[1] & 0b10000000),
+        "motion_detected": bool(data[1] & 0b01000000),
+        "battery": data[2] & 0b01111111,
+        "contact_open": data[3] & 0b00000010 == 0b00000010,
+        "contact_timeout": data[3] & 0b00000110 == 0b00000110,
+        "is_light": bool(data[3] & 0b00000001),
+        "button_count": (data[7] & 0b11110000) >> 4,
+    }

+ 19 - 0
switchbot/adv_parsers/curtain.py

@@ -0,0 +1,19 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+
+def process_wocurtain(
+    data: bytes, mfr_data: bytes | None, reverse: bool = True
+) -> dict[str, bool | int]:
+    """Process woCurtain/Curtain services data."""
+
+    _position = max(min(data[3] & 0b01111111, 100), 0)
+
+    return {
+        "calibration": bool(data[1] & 0b01000000),
+        "battery": data[2] & 0b01111111,
+        "inMotion": bool(data[3] & 0b10000000),
+        "position": (100 - _position) if reverse else _position,
+        "lightLevel": (data[4] >> 4) & 0b00001111,
+        "deviceChain": data[4] & 0b00000111,
+    }

+ 19 - 0
switchbot/adv_parsers/meter.py

@@ -0,0 +1,19 @@
+"""Meter parser."""
+
+
+def process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
+    """Process woSensorTH/Temp sensor services data."""
+
+    _temp_sign = 1 if data[4] & 0b10000000 else -1
+    _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
+    _temp_f = (_temp_c * 9 / 5) + 32
+    _temp_f = (_temp_f * 10) / 10
+
+    _wosensorth_data = {
+        "temp": {"c": _temp_c, "f": _temp_f},
+        "fahrenheit": bool(data[5] & 0b10000000),
+        "humidity": data[5] & 0b01111111,
+        "battery": data[2] & 0b01111111,
+    }
+
+    return _wosensorth_data

+ 15 - 0
switchbot/adv_parsers/motion.py

@@ -0,0 +1,15 @@
+"""Motion sensor parser."""
+
+
+def process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process WoPresence Sensor services data."""
+    return {
+        "tested": bool(data[1] & 0b10000000),
+        "motion_detected": bool(data[1] & 0b01000000),
+        "battery": data[2] & 0b01111111,
+        "led": (data[5] & 0b00100000) >> 5,
+        "iot": (data[5] & 0b00010000) >> 4,
+        "sense_distance": (data[5] & 0b00001100) >> 2,
+        "light_intensity": data[5] & 0b00000011,
+        "is_light": bool(data[5] & 0b00000010),
+    }

+ 10 - 0
switchbot/adv_parsers/plug.py

@@ -0,0 +1,10 @@
+"""Library to handle connection with Switchbot."""
+
+
+def process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process plug mini."""
+    return {
+        "switchMode": True,
+        "isOn": mfr_data[7] == 0x80,
+        "wifi_rssi": -mfr_data[9],
+    }

+ 6 - 0
switchbot/const.py

@@ -0,0 +1,6 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+DEFAULT_RETRY_COUNT = 3
+DEFAULT_RETRY_TIMEOUT = 1
+DEFAULT_SCAN_TIMEOUT = 5

+ 1 - 0
switchbot/devices/__init__.py

@@ -0,0 +1 @@
+"""Switchbot Device Library."""

+ 151 - 0
switchbot/devices/bot.py

@@ -0,0 +1,151 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from .device import DEVICE_SET_EXTENDED_KEY, DEVICE_SET_MODE_KEY, SwitchbotDevice
+
+# Bot keys
+PRESS_KEY = "570100"
+ON_KEY = "570101"
+OFF_KEY = "570102"
+DOWN_KEY = "570103"
+UP_KEY = "570104"
+
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class Switchbot(SwitchbotDevice):
+    """Representation of a Switchbot."""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        """Switchbot Bot/WoHand constructor."""
+        super().__init__(*args, **kwargs)
+        self._inverse: bool = kwargs.pop("inverse_mode", False)
+        self._settings: dict[str, Any] = {}
+
+    async def update(self, interface: int | None = None) -> None:
+        """Update mode, battery percent and state of device."""
+        await self.get_device_data(retry=self._retry_count, interface=interface)
+
+    async def turn_on(self) -> bool:
+        """Turn device on."""
+        result = await self._sendcommand(ON_KEY, self._retry_count)
+
+        if result[0] == 1:
+            return True
+
+        if result[0] == 5:
+            _LOGGER.debug("Bot is in press mode and doesn't have on state")
+            return True
+
+        return False
+
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        result = await self._sendcommand(OFF_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        if result[0] == 5:
+            _LOGGER.debug("Bot is in press mode and doesn't have off state")
+            return True
+
+        return False
+
+    async def hand_up(self) -> bool:
+        """Raise device arm."""
+        result = await self._sendcommand(UP_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        if result[0] == 5:
+            _LOGGER.debug("Bot is in press mode")
+            return True
+
+        return False
+
+    async def hand_down(self) -> bool:
+        """Lower device arm."""
+        result = await self._sendcommand(DOWN_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        if result[0] == 5:
+            _LOGGER.debug("Bot is in press mode")
+            return True
+
+        return False
+
+    async def press(self) -> bool:
+        """Press command to device."""
+        result = await self._sendcommand(PRESS_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        if result[0] == 5:
+            _LOGGER.debug("Bot is in switch mode")
+            return True
+
+        return False
+
+    async def set_switch_mode(
+        self, switch_mode: bool = False, strength: int = 100, inverse: bool = False
+    ) -> bool:
+        """Change bot mode."""
+        mode_key = format(switch_mode, "b") + format(inverse, "b")
+        strength_key = f"{strength:0{2}x}"  # to hex with padding to double digit
+
+        result = await self._sendcommand(
+            DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
+        )
+
+        if result[0] == 1:
+            return True
+
+        return False
+
+    async def set_long_press(self, duration: int = 0) -> bool:
+        """Set bot long press duration."""
+        duration_key = f"{duration:0{2}x}"  # to hex with padding to double digit
+
+        result = await self._sendcommand(
+            DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
+        )
+
+        if result[0] == 1:
+            return True
+
+        return False
+
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+        if not (_data := await self._get_basic_info()):
+            return None
+        return {
+            "battery": _data[1],
+            "firmware": _data[2] / 10.0,
+            "strength": _data[3],
+            "timers": _data[8],
+            "switchMode": bool(_data[9] & 16),
+            "inverseDirection": bool(_data[9] & 1),
+            "holdSeconds": _data[10],
+        }
+
+    def switch_mode(self) -> Any:
+        """Return true or false from cache."""
+        # To get actual position call update() first.
+        return self._get_adv_value("switchMode")
+
+    def is_on(self) -> Any:
+        """Return switch state from cache."""
+        # To get actual position call update() first.
+        value = self._get_adv_value("isOn")
+        if value is None:
+            return None
+
+        if self._inverse:
+            return not value
+        return value

+ 0 - 0
switchbot/devices/bulb.py


+ 0 - 0
switchbot/devices/contact.py


+ 188 - 0
switchbot/devices/curtain.py

@@ -0,0 +1,188 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from .device import SwitchbotDevice
+
+# Curtain keys
+OPEN_KEY = "570f450105ff00"  # 570F4501010100
+CLOSE_KEY = "570f450105ff64"  # 570F4501010164
+POSITION_KEY = "570F450105ff"  # +actual_position ex: 570F450105ff32 for 50%
+STOP_KEY = "570F450100ff"
+CURTAIN_EXT_SUM_KEY = "570f460401"
+CURTAIN_EXT_ADV_KEY = "570f460402"
+CURTAIN_EXT_CHAIN_INFO_KEY = "570f468101"
+
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SwitchbotCurtain(SwitchbotDevice):
+    """Representation of a Switchbot Curtain."""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        """Switchbot Curtain/WoCurtain constructor."""
+
+        # The position of the curtain is saved returned with 0 = open and 100 = closed.
+        # This is independent of the calibration of the curtain bot (Open left to right/
+        # Open right to left/Open from the middle).
+        # The parameter 'reverse_mode' reverse these values,
+        # if 'reverse_mode' = True, position = 0 equals close
+        # and position = 100 equals open. The parameter is default set to True so that
+        # the definition of position is the same as in Home Assistant.
+
+        super().__init__(*args, **kwargs)
+        self._reverse: bool = kwargs.pop("reverse_mode", True)
+        self._settings: dict[str, Any] = {}
+        self.ext_info_sum: dict[str, Any] = {}
+        self.ext_info_adv: dict[str, Any] = {}
+
+    async def open(self) -> bool:
+        """Send open command."""
+        result = await self._sendcommand(OPEN_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        return False
+
+    async def close(self) -> bool:
+        """Send close command."""
+        result = await self._sendcommand(CLOSE_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        return False
+
+    async def stop(self) -> bool:
+        """Send stop command to device."""
+        result = await self._sendcommand(STOP_KEY, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        return False
+
+    async def set_position(self, position: int) -> bool:
+        """Send position command (0-100) to device."""
+        position = (100 - position) if self._reverse else position
+        hex_position = "%0.2X" % position
+        result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
+        if result[0] == 1:
+            return True
+
+        return False
+
+    async def update(self, interface: int | None = None) -> None:
+        """Update position, battery percent and light level of device."""
+        await self.get_device_data(retry=self._retry_count, interface=interface)
+
+    def get_position(self) -> Any:
+        """Return cached position (0-100) of Curtain."""
+        # To get actual position call update() first.
+        return self._get_adv_value("position")
+
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+        if not (_data := await self._get_basic_info()):
+            return None
+
+        _position = max(min(_data[6], 100), 0)
+        return {
+            "battery": _data[1],
+            "firmware": _data[2] / 10.0,
+            "chainLength": _data[3],
+            "openDirection": (
+                "right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
+            ),
+            "touchToOpen": bool(_data[4] & 0b01000000),
+            "light": bool(_data[4] & 0b00100000),
+            "fault": bool(_data[4] & 0b00001000),
+            "solarPanel": bool(_data[5] & 0b00001000),
+            "calibrated": bool(_data[5] & 0b00000100),
+            "inMotion": bool(_data[5] & 0b01000011),
+            "position": (100 - _position) if self._reverse else _position,
+            "timers": _data[7],
+        }
+
+    async def get_extended_info_summary(self) -> dict[str, Any] | None:
+        """Get basic info for all devices in chain."""
+        _data = await self._sendcommand(
+            key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
+        )
+
+        if _data in (b"\x07", b"\x00"):
+            _LOGGER.error("Unsuccessfull, please try again")
+            return None
+
+        self.ext_info_sum["device0"] = {
+            "openDirectionDefault": not bool(_data[1] & 0b10000000),
+            "touchToOpen": bool(_data[1] & 0b01000000),
+            "light": bool(_data[1] & 0b00100000),
+            "openDirection": (
+                "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
+            ),
+        }
+
+        # if grouped curtain device present.
+        if _data[2] != 0:
+            self.ext_info_sum["device1"] = {
+                "openDirectionDefault": not bool(_data[1] & 0b10000000),
+                "touchToOpen": bool(_data[1] & 0b01000000),
+                "light": bool(_data[1] & 0b00100000),
+                "openDirection": (
+                    "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
+                ),
+            }
+
+        return self.ext_info_sum
+
+    async def get_extended_info_adv(self) -> dict[str, Any] | None:
+        """Get advance page info for device chain."""
+
+        _data = await self._sendcommand(
+            key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
+        )
+
+        if _data in (b"\x07", b"\x00"):
+            _LOGGER.error("Unsuccessfull, please try again")
+            return None
+
+        _state_of_charge = [
+            "not_charging",
+            "charging_by_adapter",
+            "charging_by_solar",
+            "fully_charged",
+            "solar_not_charging",
+            "charging_error",
+        ]
+
+        self.ext_info_adv["device0"] = {
+            "battery": _data[1],
+            "firmware": _data[2] / 10.0,
+            "stateOfCharge": _state_of_charge[_data[3]],
+        }
+
+        # If grouped curtain device present.
+        if _data[4]:
+            self.ext_info_adv["device1"] = {
+                "battery": _data[4],
+                "firmware": _data[5] / 10.0,
+                "stateOfCharge": _state_of_charge[_data[6]],
+            }
+
+        return self.ext_info_adv
+
+    def get_light_level(self) -> Any:
+        """Return cached light level."""
+        # To get actual light level call update() first.
+        return self._get_adv_value("lightLevel")
+
+    def is_reversed(self) -> bool:
+        """Return True if curtain position is opposite from SB data."""
+        return self._reverse
+
+    def is_calibrated(self) -> Any:
+        """Return True curtain is calibrated."""
+        # To get actual light level call update() first.
+        return self._get_adv_value("calibration")

+ 189 - 0
switchbot/devices/device.py

@@ -0,0 +1,189 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+import asyncio
+import binascii
+import logging
+from typing import Any
+from uuid import UUID
+
+import async_timeout
+import bleak
+from bleak.backends.device import BLEDevice
+from bleak_retry_connector import BleakClient, establish_connection
+
+from ..const import DEFAULT_RETRY_COUNT, DEFAULT_SCAN_TIMEOUT
+from ..discovery import GetSwitchbotDevices
+from ..models import SwitchBotAdvertisement
+
+_LOGGER = logging.getLogger(__name__)
+
+# Keys common to all device types
+DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
+DEVICE_SET_MODE_KEY = "5703"
+DEVICE_SET_EXTENDED_KEY = "570f"
+
+# Base key when encryption is set
+KEY_PASSWORD_PREFIX = "571"
+
+
+def _sb_uuid(comms_type: str = "service") -> UUID | str:
+    """Return Switchbot UUID."""
+
+    _uuid = {"tx": "002", "rx": "003", "service": "d00"}
+
+    if comms_type in _uuid:
+        return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b")
+
+    return "Incorrect type, choose between: tx, rx or service"
+
+
+class SwitchbotDevice:
+    """Base Representation of a Switchbot Device."""
+
+    def __init__(
+        self,
+        device: BLEDevice,
+        password: str | None = None,
+        interface: int = 0,
+        **kwargs: Any,
+    ) -> None:
+        """Switchbot base class constructor."""
+        self._interface = f"hci{interface}"
+        self._device = device
+        self._sb_adv_data: SwitchBotAdvertisement | None = None
+        self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
+        self._retry_count: int = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
+        self._connect_lock = asyncio.Lock()
+        if password is None or password == "":
+            self._password_encoded = None
+        else:
+            self._password_encoded = "%x" % (
+                binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF
+            )
+
+    def _commandkey(self, key: str) -> str:
+        """Add password to key if set."""
+        if self._password_encoded is None:
+            return key
+        key_action = key[3]
+        key_suffix = key[4:]
+        return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
+
+    async def _sendcommand(self, key: str, retry: int) -> bytes:
+        """Send command to device and read response."""
+        command = bytearray.fromhex(self._commandkey(key))
+        _LOGGER.debug("Sending command to switchbot %s", command)
+        max_attempts = retry + 1
+        async with self._connect_lock:
+            for attempt in range(max_attempts):
+                try:
+                    return await self._send_command_locked(key, command)
+                except (bleak.BleakError, asyncio.exceptions.TimeoutError):
+                    if attempt == retry:
+                        _LOGGER.error(
+                            "Switchbot communication failed. Stopping trying",
+                            exc_info=True,
+                        )
+                        return b"\x00"
+
+                    _LOGGER.debug("Switchbot communication failed with:", exc_info=True)
+
+        raise RuntimeError("Unreachable")
+
+    @property
+    def name(self) -> str:
+        """Return device name."""
+        return f"{self._device.name} ({self._device.address})"
+
+    async def _send_command_locked(self, key: str, command: bytes) -> bytes:
+        """Send command to device and read response."""
+        client: BleakClient | None = None
+        try:
+            _LOGGER.debug("%s: Connnecting to switchbot", self.name)
+            client = await establish_connection(
+                BleakClient, self._device, self.name, max_attempts=1
+            )
+            _LOGGER.debug(
+                "%s: Connnected to switchbot: %s", self.name, client.is_connected
+            )
+            future: asyncio.Future[bytearray] = asyncio.Future()
+
+            def _notification_handler(_sender: int, data: bytearray) -> None:
+                """Handle notification responses."""
+                if future.done():
+                    _LOGGER.debug("%s: Notification handler already done", self.name)
+                    return
+                future.set_result(data)
+
+            _LOGGER.debug("%s: Subscribe to notifications", self.name)
+            await client.start_notify(_sb_uuid(comms_type="rx"), _notification_handler)
+
+            _LOGGER.debug("%s: Sending command, %s", self.name, key)
+            await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False)
+
+            async with async_timeout.timeout(5):
+                notify_msg = await future
+            _LOGGER.info("%s: Notification received: %s", self.name, notify_msg)
+
+            _LOGGER.debug("%s: UnSubscribe to notifications", self.name)
+            await client.stop_notify(_sb_uuid(comms_type="rx"))
+
+        finally:
+            if client:
+                await client.disconnect()
+
+        if notify_msg == b"\x07":
+            _LOGGER.error("Password required")
+        elif notify_msg == b"\t":
+            _LOGGER.error("Password incorrect")
+        return notify_msg
+
+    def get_address(self) -> str:
+        """Return address of device."""
+        return self._device.address
+
+    def _get_adv_value(self, key: str) -> Any:
+        """Return value from advertisement data."""
+        if not self._sb_adv_data:
+            return None
+        return self._sb_adv_data.data["data"][key]
+
+    def get_battery_percent(self) -> Any:
+        """Return device battery level in percent."""
+        return self._get_adv_value("battery")
+
+    def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
+        """Update device data from advertisement."""
+        self._sb_adv_data = advertisement
+        self._device = advertisement.device
+
+    async def get_device_data(
+        self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
+    ) -> dict | None:
+        """Find switchbot devices and their advertisement data."""
+        if interface:
+            _interface: int = interface
+        else:
+            _interface = int(self._interface.replace("hci", ""))
+
+        _data = await GetSwitchbotDevices(interface=_interface).discover(
+            retry=retry, scan_timeout=self._scan_timeout
+        )
+
+        if self._device.address in _data:
+            self._sb_adv_data = _data[self._device.address]
+
+        return self._sb_adv_data
+
+    async def _get_basic_info(self) -> dict | None:
+        """Return basic info of device."""
+        _data = await self._sendcommand(
+            key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
+        )
+
+        if _data in (b"\x07", b"\x00"):
+            _LOGGER.error("Unsuccessful, please try again")
+            return None
+
+        return _data

+ 0 - 0
switchbot/devices/meter.py


+ 0 - 0
switchbot/devices/motion.py


+ 39 - 0
switchbot/devices/plug.py

@@ -0,0 +1,39 @@
+"""Library to handle connection with Switchbot."""
+from typing import Any
+
+from .device import SwitchbotDevice
+
+# Plug Mini keys
+PLUG_ON_KEY = "570f50010180"
+PLUG_OFF_KEY = "570f50010100"
+
+
+class SwitchbotPlugMini(SwitchbotDevice):
+    """Representation of a Switchbot plug mini."""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        """Switchbot plug mini constructor."""
+        super().__init__(*args, **kwargs)
+        self._settings: dict[str, Any] = {}
+
+    async def update(self, interface: int | None = None) -> None:
+        """Update state of device."""
+        await self.get_device_data(retry=self._retry_count, interface=interface)
+
+    async def turn_on(self) -> bool:
+        """Turn device on."""
+        result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
+        return result[1] == 0x80
+
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
+        return result[1] == 0x00
+
+    def is_on(self) -> Any:
+        """Return switch state from cache."""
+        # To get actual position call update() first.
+        value = self._get_adv_value("isOn")
+        if value is None:
+            return None
+        return value

+ 116 - 0
switchbot/discovery.py

@@ -0,0 +1,116 @@
+"""Discover switchbot devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+
+import bleak
+from bleak.backends.device import BLEDevice
+from bleak.backends.scanner import AdvertisementData
+
+from .adv_parser import parse_advertisement_data
+from .const import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_TIMEOUT, DEFAULT_SCAN_TIMEOUT
+from .models import SwitchBotAdvertisement
+
+_LOGGER = logging.getLogger(__name__)
+CONNECT_LOCK = asyncio.Lock()
+
+
+class GetSwitchbotDevices:
+    """Scan for all Switchbot devices and return by type."""
+
+    def __init__(self, interface: int = 0) -> None:
+        """Get switchbot devices class constructor."""
+        self._interface = f"hci{interface}"
+        self._adv_data: dict[str, SwitchBotAdvertisement] = {}
+
+    def detection_callback(
+        self,
+        device: BLEDevice,
+        advertisement_data: AdvertisementData,
+    ) -> None:
+        """Callback for device detection."""
+        discovery = parse_advertisement_data(device, advertisement_data)
+        if discovery:
+            self._adv_data[discovery.address] = discovery
+
+    async def discover(
+        self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
+    ) -> dict:
+        """Find switchbot devices and their advertisement data."""
+
+        devices = None
+        devices = bleak.BleakScanner(
+            # TODO: Find new UUIDs to filter on. For example, see
+            # https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md
+            adapter=self._interface,
+        )
+        devices.register_detection_callback(self.detection_callback)
+
+        async with CONNECT_LOCK:
+            await devices.start()
+            await asyncio.sleep(scan_timeout)
+            await devices.stop()
+
+        if devices is None:
+            if retry < 1:
+                _LOGGER.error(
+                    "Scanning for Switchbot devices failed. Stop trying", exc_info=True
+                )
+                return self._adv_data
+
+            _LOGGER.warning(
+                "Error scanning for Switchbot devices. Retrying (remaining: %d)",
+                retry,
+            )
+            await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
+            return await self.discover(retry - 1, scan_timeout)
+
+        return self._adv_data
+
+    async def _get_devices_by_model(
+        self,
+        model: str,
+    ) -> dict:
+        """Get switchbot devices by type."""
+        if not self._adv_data:
+            await self.discover()
+
+        return {
+            address: adv
+            for address, adv in self._adv_data.items()
+            if adv.data.get("model") == model
+        }
+
+    async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
+        """Return all WoCurtain/Curtains devices with services data."""
+        return await self._get_devices_by_model("c")
+
+    async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
+        """Return all WoHand/Bot devices with services data."""
+        return await self._get_devices_by_model("H")
+
+    async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
+        """Return all WoSensorTH/Temp sensor devices with services data."""
+        base_meters = await self._get_devices_by_model("T")
+        plus_meters = await self._get_devices_by_model("i")
+        return {**base_meters, **plus_meters}
+
+    async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
+        """Return all WoContact/Contact sensor devices with services data."""
+        return await self._get_devices_by_model("d")
+
+    async def get_device_data(
+        self, address: str
+    ) -> dict[str, SwitchBotAdvertisement] | None:
+        """Return data for specific device."""
+        if not self._adv_data:
+            await self.discover()
+
+        return {
+            device: data
+            for device, data in self._adv_data.items()
+            # MacOS uses UUIDs instead of MAC addresses
+            if data.get("address") == address
+        }

+ 16 - 0
switchbot/models.py

@@ -0,0 +1,16 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any
+
+from bleak.backends.device import BLEDevice
+
+
+@dataclass
+class SwitchBotAdvertisement:
+    """Switchbot advertisement."""
+
+    address: str
+    data: dict[str, Any]
+    device: BLEDevice