Browse Source

Add support for Evaporative Humidifier (#296)

* Add support for Evaporative Humidifier

* Formatting

* chore(pre-commit.ci): auto fixes

* Fix comments

* Advertisement parser improvements

* Improved device model

* Encryption related methods moved to base class

* chore(pre-commit.ci): auto fixes

* Update

* Add tests

* chore(pre-commit.ci): auto fixes

* chore(pre-commit.ci): auto fixes

* chore: address some of the lint issues

* Extract constants

* Fix sleep command

* Improve tests

* chore(pre-commit.ci): auto fixes

* Fix tests

* Fix return types

* Move humidifier consts to separate module

* Add basic info command

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Damian Sypniewski 1 month ago
parent
commit
98b42dc4bf

+ 6 - 0
switchbot/__init__.py

@@ -23,6 +23,7 @@ from .devices.bulb import SwitchbotBulb
 from .devices.ceiling_light import SwitchbotCeilingLight
 from .devices.curtain import SwitchbotCurtain
 from .devices.device import ColorMode, SwitchbotDevice, SwitchbotEncryptedDevice
+from .devices.evaporative_humidifier import SwitchbotEvaporativeHumidifier
 from .devices.humidifier import SwitchbotHumidifier
 from .devices.light_strip import SwitchbotLightStrip
 from .devices.lock import SwitchbotLock
@@ -37,6 +38,7 @@ __all__ = [
     "LockStatus",
     "SwitchBotAdvertisement",
     "Switchbot",
+    "Switchbot",
     "SwitchbotAccountConnectionError",
     "SwitchbotApiError",
     "SwitchbotAuthenticationError",
@@ -47,13 +49,17 @@ __all__ = [
     "SwitchbotCurtain",
     "SwitchbotDevice",
     "SwitchbotEncryptedDevice",
+    "SwitchbotEvaporativeHumidifier",
     "SwitchbotHumidifier",
     "SwitchbotLightStrip",
     "SwitchbotLock",
     "SwitchbotModel",
+    "SwitchbotModel",
+    "SwitchbotPlugMini",
     "SwitchbotPlugMini",
     "SwitchbotRelaySwitch",
     "SwitchbotSupportedType",
+    "SwitchbotSupportedType",
     "close_stale_connections",
     "close_stale_connections_by_address",
     "get_device",

+ 7 - 1
switchbot/adv_parser.py

@@ -17,7 +17,7 @@ from .adv_parsers.ceiling_light import process_woceiling
 from .adv_parsers.contact import process_wocontact
 from .adv_parsers.curtain import process_wocurtain
 from .adv_parsers.hub2 import process_wohub2
-from .adv_parsers.humidifier import process_wohumidifier
+from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
 from .adv_parsers.keypad import process_wokeypad
 from .adv_parsers.leak import process_leak
 from .adv_parsers.light_strip import process_wostrip
@@ -164,6 +164,12 @@ SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
         "manufacturer_id": 741,
         "manufacturer_data_length": 6,
     },
+    "#": {
+        "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
+        "modelFriendlyName": "Evaporative Humidifier",
+        "func": process_evaporative_humidifier,
+        "manufacturer_id": 2409,
+    },
     "o": {
         "modelName": SwitchbotModel.LOCK,
         "modelFriendlyName": "Lock",

+ 60 - 0
switchbot/adv_parsers/humidifier.py

@@ -3,6 +3,14 @@
 from __future__ import annotations
 
 import logging
+from datetime import timedelta
+
+from ..const.evaporative_humidifier import (
+    OVER_HUMIDIFY_PROTECTION_MODES,
+    TARGET_HUMIDITY_MODES,
+    HumidifierMode,
+    HumidifierWaterLevel,
+)
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -31,3 +39,55 @@ def process_wohumidifier(
         "level": data[4],
         "switchMode": True,
     }
+
+
+def process_evaporative_humidifier(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int]:
+    """Process WoHumi services data."""
+    if mfr_data is None:
+        return {
+            "isOn": None,
+            "mode": None,
+            "target_humidity": None,
+            "child_lock": None,
+            "over_humidify_protection": None,
+            "tank_removed": None,
+            "tilted_alert": None,
+            "filter_missing": None,
+            "humidity": None,
+            "temperature": None,
+            "filter_run_time": None,
+            "filter_alert": None,
+            "water_level": None,
+        }
+
+    is_on = bool(mfr_data[7] & 0b10000000)
+    mode = HumidifierMode(mfr_data[7] & 0b00001111)
+    filter_run_time = timedelta(hours=int.from_bytes(mfr_data[12:14], byteorder="big"))
+    has_humidity = bool(mfr_data[9] & 0b10000000)
+    has_temperature = bool(mfr_data[10] & 0b10000000)
+    is_tank_removed = bool(mfr_data[8] & 0b00000100)
+    return {
+        "isOn": is_on,
+        "mode": mode if is_on else None,
+        "target_humidity": (mfr_data[16] & 0b01111111)
+        if is_on and mode in TARGET_HUMIDITY_MODES
+        else None,
+        "child_lock": bool(mfr_data[8] & 0b00100000),
+        "over_humidify_protection": bool(mfr_data[8] & 0b10000000)
+        if is_on and mode in OVER_HUMIDIFY_PROTECTION_MODES
+        else None,
+        "tank_removed": is_tank_removed,
+        "tilted_alert": bool(mfr_data[8] & 0b00000010),
+        "filter_missing": bool(mfr_data[8] & 0b00000001),
+        "humidity": (mfr_data[9] & 0b01111111) if has_humidity else None,
+        "temperature": float(mfr_data[10] & 0b01111111) + float(mfr_data[11] >> 4) / 10
+        if has_temperature
+        else None,
+        "filter_run_time": filter_run_time,
+        "filter_alert": filter_run_time.days >= 10,
+        "water_level": HumidifierWaterLevel(mfr_data[11] & 0b00000011)
+        if not is_tank_removed
+        else None,
+    }

+ 1 - 0
switchbot/const/__init__.py

@@ -62,3 +62,4 @@ class SwitchbotModel(StrEnum):
     RELAY_SWITCH_1PM = "Relay Switch 1PM"
     RELAY_SWITCH_1 = "Relay Switch 1"
     REMOTE = "WoRemote"
+    EVAPORATIVE_HUMIDIFIER = "Evaporative Humidifier"

+ 34 - 0
switchbot/const/evaporative_humidifier.py

@@ -0,0 +1,34 @@
+from __future__ import annotations
+
+from enum import Enum
+
+
+class HumidifierMode(Enum):
+    HIGH = 1
+    MEDIUM = 2
+    LOW = 3
+    QUIET = 4
+    TARGET_HUMIDITY = 5
+    SLEEP = 6
+    AUTO = 7
+    DRYING_FILTER = 8
+
+
+class HumidifierWaterLevel(Enum):
+    EMPTY = 0
+    LOW = 1
+    MEDIUM = 2
+    HIGH = 3
+
+
+OVER_HUMIDIFY_PROTECTION_MODES = {
+    HumidifierMode.QUIET,
+    HumidifierMode.LOW,
+    HumidifierMode.MEDIUM,
+    HumidifierMode.HIGH,
+}
+
+TARGET_HUMIDITY_MODES = {
+    HumidifierMode.SLEEP,
+    HumidifierMode.TARGET_HUMIDITY,
+}

+ 212 - 0
switchbot/devices/evaporative_humidifier.py

@@ -0,0 +1,212 @@
+import logging
+from typing import Any
+
+from bleak.backends.device import BLEDevice
+
+from ..const import SwitchbotModel
+from ..const.evaporative_humidifier import (
+    TARGET_HUMIDITY_MODES,
+    HumidifierMode,
+    HumidifierWaterLevel,
+)
+from ..models import SwitchBotAdvertisement
+from .device import SwitchbotEncryptedDevice
+
+_LOGGER = logging.getLogger(__name__)
+
+COMMAND_HEADER = "57"
+COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
+COMMAND_TURN_ON = f"{COMMAND_HEADER}0f430101"
+COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f430100"
+COMMAND_CHILD_LOCK_ON = f"{COMMAND_HEADER}0f430501"
+COMMAND_CHILD_LOCK_OFF = f"{COMMAND_HEADER}0f430500"
+COMMAND_AUTO_DRY_ON = f"{COMMAND_HEADER}0f430a01"
+COMMAND_AUTO_DRY_OFF = f"{COMMAND_HEADER}0f430a02"
+COMMAND_SET_MODE = f"{COMMAND_HEADER}0f4302"
+COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}000300"
+
+MODES_COMMANDS = {
+    HumidifierMode.HIGH: "010100",
+    HumidifierMode.MEDIUM: "010200",
+    HumidifierMode.LOW: "010300",
+    HumidifierMode.QUIET: "010400",
+    HumidifierMode.TARGET_HUMIDITY: "0200",
+    HumidifierMode.SLEEP: "0300",
+    HumidifierMode.AUTO: "040000",
+}
+
+
+class SwitchbotEvaporativeHumidifier(SwitchbotEncryptedDevice):
+    """Representation of a Switchbot Evaporative Humidifier"""
+
+    def __init__(
+        self,
+        device: BLEDevice,
+        key_id: str,
+        encryption_key: str,
+        interface: int = 0,
+        model: SwitchbotModel = SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
+        **kwargs: Any,
+    ) -> None:
+        self._force_next_update = False
+        super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
+
+    @classmethod
+    async def verify_encryption_key(
+        cls,
+        device: BLEDevice,
+        key_id: str,
+        encryption_key: str,
+        model: SwitchbotModel = SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
+        **kwargs: Any,
+    ) -> bool:
+        return await super().verify_encryption_key(
+            device, key_id, encryption_key, model, **kwargs
+        )
+
+    def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
+        """Update device data from advertisement."""
+        super().update_from_advertisement(advertisement)
+        _LOGGER.debug(
+            "%s: update advertisement: %s",
+            self.name,
+            advertisement,
+        )
+
+    async def _get_basic_info(self) -> bytes | None:
+        """Return basic info of device."""
+        _data = await self._send_command(
+            key=COMMAND_GET_BASIC_INFO, retry=self._retry_count
+        )
+
+        if _data in (b"\x07", b"\x00"):
+            _LOGGER.error("Unsuccessful, please try again")
+            return None
+
+        return _data
+
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+        if not (_data := await self._get_basic_info()):
+            return None
+
+        # Not 100% sure about this data, will verify once a firmware update is available
+        return {
+            "firmware": _data[2] / 10.0,
+        }
+
+    async def turn_on(self) -> bool:
+        """Turn device on."""
+        result = await self._send_command(COMMAND_TURN_ON)
+        if ok := self._check_command_result(result, 0, {1}):
+            self._override_state({"isOn": True})
+            self._fire_callbacks()
+        return ok
+
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        result = await self._send_command(COMMAND_TURN_OFF)
+        if ok := self._check_command_result(result, 0, {1}):
+            self._override_state({"isOn": False})
+            self._fire_callbacks()
+        return ok
+
+    async def set_mode(
+        self, mode: HumidifierMode, target_humidity: int | None = None
+    ) -> bool:
+        """Set device mode."""
+        if mode == HumidifierMode.DRYING_FILTER:
+            return await self.start_drying_filter()
+        elif mode not in MODES_COMMANDS:
+            raise ValueError("Invalid mode")
+
+        command = COMMAND_SET_MODE + MODES_COMMANDS[mode]
+        if mode in TARGET_HUMIDITY_MODES:
+            if target_humidity is None:
+                raise TypeError("target_humidity is required")
+            command += f"{target_humidity:02x}"
+        result = await self._send_command(command)
+        if ok := self._check_command_result(result, 0, {1}):
+            self._override_state({"mode": mode})
+            if mode == HumidifierMode.TARGET_HUMIDITY and target_humidity is not None:
+                self._override_state({"target_humidity": target_humidity})
+            self._fire_callbacks()
+        return ok
+
+    async def set_child_lock(self, enabled: bool) -> bool:
+        """Set child lock."""
+        result = await self._send_command(
+            COMMAND_CHILD_LOCK_ON if enabled else COMMAND_CHILD_LOCK_OFF
+        )
+        if ok := self._check_command_result(result, 0, {1}):
+            self._override_state({"child_lock": enabled})
+            self._fire_callbacks()
+        return ok
+
+    async def start_drying_filter(self):
+        """Start drying filter."""
+        result = await self._send_command(COMMAND_TURN_ON + "08")
+        if ok := self._check_command_result(result, 0, {1}):
+            self._override_state({"mode": HumidifierMode.DRYING_FILTER})
+            self._fire_callbacks()
+        return ok
+
+    async def stop_drying_filter(self):
+        """Stop drying filter."""
+        result = await self._send_command(COMMAND_TURN_OFF)
+        if ok := self._check_command_result(result, 0, {0}):
+            self._override_state({"isOn": False, "mode": None})
+            self._fire_callbacks()
+        return ok
+
+    def is_on(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("isOn")
+
+    def get_mode(self) -> HumidifierMode | None:
+        """Return state from cache."""
+        return self._get_adv_value("mode")
+
+    def is_child_lock_enabled(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("child_lock")
+
+    def is_over_humidify_protection_enabled(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("over_humidify_protection")
+
+    def is_tank_removed(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("tank_removed")
+
+    def is_filter_missing(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("filter_missing")
+
+    def is_filter_alert_on(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("filter_alert")
+
+    def is_tilted_alert_on(self) -> bool | None:
+        """Return state from cache."""
+        return self._get_adv_value("tilted_alert")
+
+    def get_water_level(self) -> HumidifierWaterLevel | None:
+        """Return state from cache."""
+        return self._get_adv_value("water_level")
+
+    def get_filter_run_time(self) -> int | None:
+        """Return state from cache."""
+        return self._get_adv_value("filter_run_time")
+
+    def get_target_humidity(self) -> int | None:
+        """Return state from cache."""
+        return self._get_adv_value("target_humidity")
+
+    def get_humidity(self) -> int | None:
+        """Return state from cache."""
+        return self._get_adv_value("humidity")
+
+    def get_temperature(self) -> float | None:
+        """Return state from cache."""
+        return self._get_adv_value("temperature")

+ 6 - 0
switchbot/discovery.py

@@ -131,6 +131,12 @@ class GetSwitchbotDevices:
         """Return all WoKeypad/Keypad devices with services data."""
         return await self._get_devices_by_model("y")
 
+    async def get_humidifiers(self) -> dict[str, SwitchBotAdvertisement]:
+        """Return all humidifier devices with services data."""
+        humidifiers = await self._get_devices_by_model("e")
+        evaporative_humidifiers = await self._get_devices_by_model("#")
+        return {**humidifiers, **evaporative_humidifiers}
+
     async def get_device_data(
         self, address: str
     ) -> dict[str, SwitchBotAdvertisement] | None:

+ 202 - 0
tests/test_evaporative_humidifier.py

@@ -0,0 +1,202 @@
+import datetime
+from unittest.mock import AsyncMock
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchBotAdvertisement, SwitchbotModel
+from switchbot.adv_parsers.humidifier import process_evaporative_humidifier
+from switchbot.const.evaporative_humidifier import HumidifierMode, HumidifierWaterLevel
+from switchbot.devices import evaporative_humidifier
+
+from .test_adv_parser import generate_ble_device
+
+
+def create_device_for_command_testing(init_data: dict | None = None):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    evaporative_humidifier_device = (
+        evaporative_humidifier.SwitchbotEvaporativeHumidifier(
+            ble_device, "ff", "ffffffffffffffffffffffffffffffff"
+        )
+    )
+    evaporative_humidifier_device.update_from_advertisement(
+        make_advertisement_data(ble_device, init_data)
+    )
+    return evaporative_humidifier_device
+
+
+def make_advertisement_data(ble_device: BLEDevice, init_data: dict | None = None):
+    if init_data is None:
+        init_data = {}
+    """Set advertisement data with defaults."""
+    return SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"#\x00\x00\x15\x1c\x00",
+            "data": {
+                "isOn": False,
+                "mode": None,
+                "target_humidity": None,
+                "child_lock": False,
+                "over_humidify_protection": True,
+                "tank_removed": False,
+                "tilted_alert": False,
+                "filter_missing": False,
+                "humidity": 51,
+                "temperature": 16.8,
+                "filter_run_time": datetime.timedelta(days=3, seconds=57600),
+                "filter_alert": False,
+                "water_level": HumidifierWaterLevel.LOW,
+            }
+            | init_data,
+            "isEncrypted": False,
+            "model": "#",
+            "modelFriendlyName": "Evaporative Humidifier",
+            "modelName": SwitchbotModel.EVAPORATIVE_HUMIDIFIER,
+        },
+        device=ble_device,
+        rssi=-80,
+        active=True,
+    )
+
+
+@pytest.mark.asyncio
+async def test_process_advertisement():
+    data = process_evaporative_humidifier(
+        b"#\x00\x00\x15\x1c\x00",
+        b"\xd4\x8cIU\x95\xb2\x08\x06\x88\xb3\x90\x81\x00X\x00X2",
+    )
+
+    assert data == {
+        "isOn": False,
+        "mode": None,
+        "target_humidity": None,
+        "child_lock": False,
+        "over_humidify_protection": None,
+        "tank_removed": False,
+        "tilted_alert": False,
+        "filter_missing": False,
+        "humidity": 51,
+        "temperature": 16.8,
+        "filter_run_time": datetime.timedelta(days=3, seconds=57600),
+        "filter_alert": False,
+        "water_level": HumidifierWaterLevel.LOW,
+    }
+
+
+@pytest.mark.asyncio
+async def test_process_advertisement_empty():
+    data = process_evaporative_humidifier(None, None)
+
+    assert data == {
+        "isOn": None,
+        "mode": None,
+        "target_humidity": None,
+        "child_lock": None,
+        "over_humidify_protection": None,
+        "tank_removed": None,
+        "tilted_alert": None,
+        "filter_missing": None,
+        "humidity": None,
+        "temperature": None,
+        "filter_run_time": None,
+        "filter_alert": None,
+        "water_level": None,
+    }
+
+
+@pytest.mark.asyncio
+async def test_turn_on():
+    device = create_device_for_command_testing({"isOn": False})
+    device._send_command = AsyncMock(return_value=b"\x01")
+
+    assert device.is_on() is False
+    await device.turn_on()
+    assert device.is_on() is True
+
+
+@pytest.mark.asyncio
+async def test_turn_off():
+    device = create_device_for_command_testing({"isOn": True})
+    device._send_command = AsyncMock(return_value=b"\x01")
+
+    assert device.is_on() is True
+    await device.turn_off()
+    assert device.is_on() is False
+
+
+@pytest.mark.asyncio
+async def test_set_mode():
+    device = create_device_for_command_testing(
+        {"isOn": True, "mode": HumidifierMode.LOW}
+    )
+    device._send_command = AsyncMock(return_value=b"\x01")
+
+    assert device.get_mode() is HumidifierMode.LOW
+    await device.set_mode(HumidifierMode.AUTO)
+    assert device.get_mode() is HumidifierMode.AUTO
+
+    await device.set_mode(HumidifierMode.TARGET_HUMIDITY, 60)
+    assert device.get_mode() is HumidifierMode.TARGET_HUMIDITY
+    assert device.get_target_humidity() == 60
+
+    await device.set_mode(HumidifierMode.DRYING_FILTER)
+    assert device.get_mode() is HumidifierMode.DRYING_FILTER
+
+    with pytest.raises(ValueError):
+        await device.set_mode(0)
+
+    with pytest.raises(TypeError):
+        await device.set_mode(HumidifierMode.TARGET_HUMIDITY)
+
+
+@pytest.mark.asyncio
+async def test_set_child_lock():
+    device = create_device_for_command_testing({"child_lock": False})
+    device._send_command = AsyncMock(return_value=b"\x01")
+
+    assert device.is_child_lock_enabled() is False
+    await device.set_child_lock(True)
+    assert device.is_child_lock_enabled() is True
+
+
+@pytest.mark.asyncio
+async def test_start_drying_filter():
+    device = create_device_for_command_testing(
+        {"isOn": True, "mode": HumidifierMode.AUTO}
+    )
+    device._send_command = AsyncMock(return_value=b"\x01")
+
+    assert device.get_mode() is HumidifierMode.AUTO
+    await device.start_drying_filter()
+    assert device.get_mode() is HumidifierMode.DRYING_FILTER
+
+
+@pytest.mark.asyncio
+async def test_stop_drying_filter():
+    device = create_device_for_command_testing(
+        {"isOn": True, "mode": HumidifierMode.DRYING_FILTER}
+    )
+    device._send_command = AsyncMock(return_value=b"\x00")
+
+    assert device.is_on() is True
+    assert device.get_mode() is HumidifierMode.DRYING_FILTER
+    await device.stop_drying_filter()
+    assert device.is_on() is False
+    assert device.get_mode() is None
+
+
+@pytest.mark.asyncio
+async def test_attributes():
+    device = create_device_for_command_testing()
+    device._send_command = AsyncMock(return_value=b"\x01")
+
+    assert device.is_over_humidify_protection_enabled() is True
+    assert device.is_tank_removed() is False
+    assert device.is_filter_missing() is False
+    assert device.is_filter_alert_on() is False
+    assert device.is_tilted_alert_on() is False
+    assert device.get_water_level() is HumidifierWaterLevel.LOW
+    assert device.get_filter_run_time() == datetime.timedelta(days=3, seconds=57600)
+    assert device.get_humidity() == 51
+    assert device.get_temperature() == 16.8