Browse Source

Add support for circulator fan (#317)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Retha Runolfsson 2 weeks ago
parent
commit
2c5ee33d74

+ 4 - 0
switchbot/__init__.py

@@ -10,6 +10,7 @@ from bleak_retry_connector import (
 
 from .adv_parser import SwitchbotSupportedType, parse_advertisement_data
 from .const import (
+    FanMode,
     LockStatus,
     SwitchbotAccountConnectionError,
     SwitchbotApiError,
@@ -24,6 +25,7 @@ from .devices.ceiling_light import SwitchbotCeilingLight
 from .devices.curtain import SwitchbotCurtain
 from .devices.device import ColorMode, SwitchbotDevice, SwitchbotEncryptedDevice
 from .devices.evaporative_humidifier import SwitchbotEvaporativeHumidifier
+from .devices.fan import SwitchbotFan
 from .devices.humidifier import SwitchbotHumidifier
 from .devices.light_strip import SwitchbotLightStrip
 from .devices.lock import SwitchbotLock
@@ -35,6 +37,7 @@ from .models import SwitchBotAdvertisement
 
 __all__ = [
     "ColorMode",
+    "FanMode",
     "GetSwitchbotDevices",
     "LockStatus",
     "SwitchBotAdvertisement",
@@ -51,6 +54,7 @@ __all__ = [
     "SwitchbotDevice",
     "SwitchbotEncryptedDevice",
     "SwitchbotEvaporativeHumidifier",
+    "SwitchbotFan",
     "SwitchbotHumidifier",
     "SwitchbotLightStrip",
     "SwitchbotLock",

+ 7 - 0
switchbot/adv_parser.py

@@ -16,6 +16,7 @@ from .adv_parsers.bulb import process_color_bulb
 from .adv_parsers.ceiling_light import process_woceiling
 from .adv_parsers.contact import process_wocontact
 from .adv_parsers.curtain import process_wocurtain
+from .adv_parsers.fan import process_fan
 from .adv_parsers.hub2 import process_wohub2
 from .adv_parsers.hubmini_matter import process_hubmini_matter
 from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
@@ -230,6 +231,12 @@ SUPPORTED_TYPES: dict[str, SwitchbotSupportedType] = {
         "func": process_hubmini_matter,
         "manufacturer_id": 2409,
     },
+    "~": {
+        "modelName": SwitchbotModel.CIRCULATOR_FAN,
+        "modelFriendlyName": "Circulator Fan",
+        "func": process_fan,
+        "manufacturer_id": 2409,
+    },
 }
 
 _SWITCHBOT_MODEL_TO_CHAR = {

+ 33 - 0
switchbot/adv_parsers/fan.py

@@ -0,0 +1,33 @@
+"""Fan adv parser."""
+
+from __future__ import annotations
+
+from ..const.fan import FanMode
+
+
+def process_fan(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process fan services data."""
+    if mfr_data is None:
+        return {}
+
+    device_data = mfr_data[6:]
+
+    _seq_num = device_data[0]
+    _isOn = bool(device_data[1] & 0b10000000)
+    _mode = (device_data[1] & 0b01110000) >> 4
+    _mode = FanMode(_mode).name if 1 <= _mode <= 4 else None
+    _nightLight = (device_data[1] & 0b00001100) >> 2
+    _oscillate_left_and_right = bool(device_data[1] & 0b00000010)
+    _oscillate_up_and_down = bool(device_data[1] & 0b00000001)
+    _battery = device_data[2] & 0b01111111
+    _speed = device_data[3] & 0b01111111
+
+    return {
+        "sequence_number": _seq_num,
+        "isOn": _isOn,
+        "mode": _mode,
+        "nightLight": _nightLight,
+        "oscillating": _oscillate_left_and_right | _oscillate_up_and_down,
+        "battery": _battery,
+        "speed": _speed,
+    }

+ 2 - 0
switchbot/const/__init__.py

@@ -3,6 +3,7 @@
 from __future__ import annotations
 
 from ..enum import StrEnum
+from .fan import FanMode as FanMode
 
 # Preserve old LockStatus export for backwards compatibility
 from .lock import LockStatus as LockStatus
@@ -65,3 +66,4 @@ class SwitchbotModel(StrEnum):
     EVAPORATIVE_HUMIDIFIER = "Evaporative Humidifier"
     ROLLER_SHADE = "Roller Shade"
     HUBMINI_MATTER = "HubMini Matter"
+    CIRCULATOR_FAN = "Circulator Fan"

+ 14 - 0
switchbot/const/fan.py

@@ -0,0 +1,14 @@
+from __future__ import annotations
+
+from enum import Enum
+
+
+class FanMode(Enum):
+    NORMAL = 1
+    NATURAL = 2
+    SLEEP = 3
+    BABY = 4
+
+    @classmethod
+    def get_modes(cls) -> list[str]:
+        return [mode.name for mode in cls]

+ 116 - 0
switchbot/devices/fan.py

@@ -0,0 +1,116 @@
+"""Library to handle connection with Switchbot."""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from ..const.fan import FanMode
+from .device import (
+    DEVICE_GET_BASIC_SETTINGS_KEY,
+    SwitchbotSequenceDevice,
+    update_after_operation,
+)
+
+_LOGGER = logging.getLogger(__name__)
+
+
+COMMAND_HEAD = "570f41"
+COMMAND_TURN_ON = f"{COMMAND_HEAD}0101"
+COMMAND_TURN_OFF = f"{COMMAND_HEAD}0102"
+COMMAND_START_OSCILLATION = f"{COMMAND_HEAD}020101ff"
+COMMAND_STOP_OSCILLATION = f"{COMMAND_HEAD}020102ff"
+COMMAND_SET_MODE = {
+    FanMode.NORMAL.name: f"{COMMAND_HEAD}030101ff",
+    FanMode.NATURAL.name: f"{COMMAND_HEAD}030102ff",
+    FanMode.SLEEP.name: f"{COMMAND_HEAD}030103",
+    FanMode.BABY.name: f"{COMMAND_HEAD}030104",
+}
+COMMAND_SET_PERCENTAGE = f"{COMMAND_HEAD}0302"  #  +speed
+COMMAND_GET_BASIC_INFO = "570f428102"
+
+
+class SwitchbotFan(SwitchbotSequenceDevice):
+    """Representation of a Switchbot Circulator Fan."""
+
+    def __init__(self, device, password=None, interface=0, **kwargs):
+        super().__init__(device, password, interface, **kwargs)
+
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+        if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
+            return None
+        if not (_data1 := await self._get_basic_info(DEVICE_GET_BASIC_SETTINGS_KEY)):
+            return None
+
+        _LOGGER.debug("data: %s", _data)
+        battery = _data[2] & 0b01111111
+        isOn = bool(_data[3] & 0b10000000)
+        oscillating = bool(_data[3] & 0b01100000)
+        _mode = _data[8] & 0b00000111
+        mode = FanMode(_mode).name if 1 <= _mode <= 4 else None
+        speed = _data[9]
+        firmware = _data1[2] / 10.0
+
+        return {
+            "battery": battery,
+            "isOn": isOn,
+            "oscillating": oscillating,
+            "mode": mode,
+            "speed": speed,
+            "firmware": firmware,
+        }
+
+    async def _get_basic_info(self, cmd: str) -> bytes | None:
+        """Return basic info of device."""
+        _data = await self._send_command(key=cmd, retry=self._retry_count)
+
+        if _data in (b"\x07", b"\x00"):
+            _LOGGER.error("Unsuccessful, please try again")
+            return None
+
+        return _data
+
+    @update_after_operation
+    async def set_preset_mode(self, preset_mode: str) -> bool:
+        """Send command to set fan preset_mode."""
+        return await self._send_command(COMMAND_SET_MODE[preset_mode])
+
+    @update_after_operation
+    async def set_percentage(self, percentage: int) -> bool:
+        """Send command to set fan percentage."""
+        return await self._send_command(f"{COMMAND_SET_PERCENTAGE}{percentage:02X}")
+
+    @update_after_operation
+    async def set_oscillation(self, oscillating: bool) -> bool:
+        """Send command to set fan oscillation"""
+        if oscillating:
+            return await self._send_command(COMMAND_START_OSCILLATION)
+        else:
+            return await self._send_command(COMMAND_STOP_OSCILLATION)
+
+    @update_after_operation
+    async def turn_on(self) -> bool:
+        """Turn on the fan."""
+        return await self._send_command(COMMAND_TURN_ON)
+
+    @update_after_operation
+    async def turn_off(self) -> bool:
+        """Turn off the fan."""
+        return await self._send_command(COMMAND_TURN_OFF)
+
+    def get_current_percentage(self) -> Any:
+        """Return cached percentage."""
+        return self._get_adv_value("speed")
+
+    def is_on(self) -> bool | None:
+        """Return fan state from cache."""
+        return self._get_adv_value("isOn")
+
+    def get_oscillating_state(self) -> Any:
+        """Return cached oscillating."""
+        return self._get_adv_value("oscillating")
+
+    def get_current_mode(self) -> Any:
+        """Return cached mode."""
+        return self._get_adv_value("mode")

+ 94 - 0
tests/test_adv_parser.py

@@ -2103,3 +2103,97 @@ def test_roller_shade_passive() -> None:
         rssi=-97,
         active=False,
     )
+
+
+def test_circulator_fan_active() -> None:
+    """Test parsing circulator fan with active data."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: b"\xb0\xe9\xfeXY\xa8~LR9"},
+        service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"~\x00R"},
+        rssi=-97,
+    )
+    result = parse_advertisement_data(
+        ble_device, adv_data, SwitchbotModel.CIRCULATOR_FAN
+    )
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"~\x00R",
+            "data": {
+                "sequence_number": 126,
+                "isOn": False,
+                "mode": "BABY",
+                "nightLight": 3,
+                "oscillating": False,
+                "battery": 82,
+                "speed": 57,
+            },
+            "isEncrypted": False,
+            "model": "~",
+            "modelFriendlyName": "Circulator Fan",
+            "modelName": SwitchbotModel.CIRCULATOR_FAN,
+        },
+        device=ble_device,
+        rssi=-97,
+        active=True,
+    )
+
+
+def test_circulator_fan_passive() -> None:
+    """Test parsing circulator fan with passive data."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: b"\xb0\xe9\xfeXY\xa8~LR9"},
+        rssi=-97,
+    )
+    result = parse_advertisement_data(
+        ble_device, adv_data, SwitchbotModel.CIRCULATOR_FAN
+    )
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": None,
+            "data": {
+                "sequence_number": 126,
+                "isOn": False,
+                "mode": "BABY",
+                "nightLight": 3,
+                "oscillating": False,
+                "battery": 82,
+                "speed": 57,
+            },
+            "isEncrypted": False,
+            "model": "~",
+            "modelFriendlyName": "Circulator Fan",
+            "modelName": SwitchbotModel.CIRCULATOR_FAN,
+        },
+        device=ble_device,
+        rssi=-97,
+        active=False,
+    )
+
+
+def test_circulator_fan_with_empty_data() -> None:
+    """Test parsing circulator fan with empty data."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: None},
+        service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"~\x00R"},
+        rssi=-97,
+    )
+    result = parse_advertisement_data(
+        ble_device, adv_data, SwitchbotModel.CIRCULATOR_FAN
+    )
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"~\x00R",
+            "data": {},
+            "isEncrypted": False,
+            "model": "~",
+        },
+        device=ble_device,
+        rssi=-97,
+        active=True,
+    )

+ 172 - 0
tests/test_fan.py

@@ -0,0 +1,172 @@
+from unittest.mock import AsyncMock
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchBotAdvertisement, SwitchbotModel
+from switchbot.const.fan import FanMode
+from switchbot.devices import fan
+
+from .test_adv_parser import generate_ble_device
+
+
+def create_device_for_command_testing(init_data: dict | None = None):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    fan_device = fan.SwitchbotFan(ble_device)
+    fan_device.update_from_advertisement(make_advertisement_data(ble_device, init_data))
+    fan_device._send_command = AsyncMock()
+    fan_device.update = AsyncMock()
+    return fan_device
+
+
+def make_advertisement_data(ble_device: BLEDevice, init_data: dict | None = None):
+    """Set advertisement data with defaults."""
+    if init_data is None:
+        init_data = {}
+
+    return SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"~\x00R",
+            "data": {
+                "isOn": True,
+                "mode": "NORMAL",
+                "nightLight": 3,
+                "oscillating": False,
+                "battery": 60,
+                "speed": 50,
+            }
+            | init_data,
+            "isEncrypted": False,
+            "model": ",",
+            "modelFriendlyName": "Circulator Fan",
+            "modelName": SwitchbotModel.CIRCULATOR_FAN,
+        },
+        device=ble_device,
+        rssi=-80,
+        active=True,
+    )
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "response, expected",
+    [
+        (b"\x00", None),
+        (b"\x07", None),
+        (b"\x01\x02\x03", b"\x01\x02\x03"),
+    ],
+)
+async def test__get_basic_info(response, expected):
+    fan_device = create_device_for_command_testing()
+    fan_device._send_command = AsyncMock(return_value=response)
+    result = await fan_device._get_basic_info(cmd="TEST_CMD")
+    assert result == expected
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "basic_info,firmware_info", [(True, False), (False, True), (False, False)]
+)
+async def test_get_basic_info_returns_none(basic_info, firmware_info):
+    fan_device = create_device_for_command_testing()
+
+    async def mock_get_basic_info(arg):
+        if arg == fan.COMMAND_GET_BASIC_INFO:
+            return basic_info
+        elif arg == fan.DEVICE_GET_BASIC_SETTINGS_KEY:
+            return firmware_info
+
+    fan_device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
+
+    assert await fan_device.get_basic_info() is None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "basic_info,firmware_info,result",
+    [
+        (
+            bytearray(b"\x01\x02W\x82g\xf5\xde4\x01=dPP\x03\x14P\x00\x00\x00\x00"),
+            bytearray(b"\x01W\x0b\x17\x01"),
+            [87, True, False, "NORMAL", 61, 1.1],
+        ),
+        (
+            bytearray(b"\x01\x02U\xc2g\xf5\xde4\x04+dPP\x03\x14P\x00\x00\x00\x00"),
+            bytearray(b"\x01U\x0b\x17\x01"),
+            [85, True, True, "BABY", 43, 1.1],
+        ),
+    ],
+)
+async def test_get_basic_info(basic_info, firmware_info, result):
+    fan_device = create_device_for_command_testing()
+
+    async def mock_get_basic_info(arg):
+        if arg == fan.COMMAND_GET_BASIC_INFO:
+            return basic_info
+        elif arg == fan.DEVICE_GET_BASIC_SETTINGS_KEY:
+            return firmware_info
+
+    fan_device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
+
+    info = await fan_device.get_basic_info()
+    assert info["battery"] == result[0]
+    assert info["isOn"] == result[1]
+    assert info["oscillating"] == result[2]
+    assert info["mode"] == result[3]
+    assert info["speed"] == result[4]
+    assert info["firmware"] == result[5]
+
+
+@pytest.mark.asyncio
+async def test_set_preset_mode():
+    fan_device = create_device_for_command_testing({"mode": "BABY"})
+    await fan_device.set_preset_mode("BABY")
+    assert fan_device.get_current_mode() == "BABY"
+
+
+@pytest.mark.asyncio
+async def test_set_set_percentage_with_speed_is_0():
+    fan_device = create_device_for_command_testing({"speed": 0, "isOn": False})
+    await fan_device.turn_off()
+    assert fan_device.get_current_percentage() == 0
+    assert fan_device.is_on() is False
+
+
+@pytest.mark.asyncio
+async def test_set_set_percentage():
+    fan_device = create_device_for_command_testing({"speed": 80})
+    await fan_device.set_percentage(80)
+    assert fan_device.get_current_percentage() == 80
+
+
+@pytest.mark.asyncio
+async def test_set_not_oscillation():
+    fan_device = create_device_for_command_testing({"oscillating": False})
+    await fan_device.set_oscillation(False)
+    assert fan_device.get_oscillating_state() is False
+
+
+@pytest.mark.asyncio
+async def test_set_oscillation():
+    fan_device = create_device_for_command_testing({"oscillating": True})
+    await fan_device.set_oscillation(True)
+    assert fan_device.get_oscillating_state() is True
+
+
+@pytest.mark.asyncio
+async def test_turn_on():
+    fan_device = create_device_for_command_testing({"isOn": True})
+    await fan_device.turn_on()
+    assert fan_device.is_on() is True
+
+
+@pytest.mark.asyncio
+async def test_turn_off():
+    fan_device = create_device_for_command_testing({"isOn": False})
+    await fan_device.turn_off()
+    assert fan_device.is_on() is False
+
+
+def test_get_modes():
+    assert FanMode.get_modes() == ["NORMAL", "NATURAL", "SLEEP", "BABY"]