1
0
Эх сурвалжийг харах

Add support for art frame (#428)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
Retha Runolfsson 4 өдөр өмнө
parent
commit
8fc2ac8aed

+ 2 - 0
switchbot/__init__.py

@@ -29,6 +29,7 @@ from .const import (
     SwitchbotModel,
 )
 from .devices.air_purifier import SwitchbotAirPurifier
+from .devices.art_frame import SwitchbotArtFrame
 from .devices.base_light import SwitchbotBaseLight
 from .devices.blind_tilt import SwitchbotBlindTilt
 from .devices.bot import Switchbot
@@ -83,6 +84,7 @@ __all__ = [
     "SwitchbotAccountConnectionError",
     "SwitchbotAirPurifier",
     "SwitchbotApiError",
+    "SwitchbotArtFrame",
     "SwitchbotAuthenticationError",
     "SwitchbotBaseLight",
     "SwitchbotBlindTilt",

+ 13 - 0
switchbot/adv_parser.py

@@ -12,6 +12,7 @@ from bleak.backends.device import BLEDevice
 from bleak.backends.scanner import AdvertisementData
 
 from .adv_parsers.air_purifier import process_air_purifier
+from .adv_parsers.art_frame import process_art_frame
 from .adv_parsers.blind_tilt import process_woblindtilt
 from .adv_parsers.bot import process_wohand
 from .adv_parsers.bulb import process_color_bulb
@@ -717,6 +718,18 @@ SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
         "func": process_presence_sensor,
         "manufacturer_id": 2409,
     },
+    b"\x00\x11>\x10": {
+        "modelName": SwitchbotModel.ART_FRAME,
+        "modelFriendlyName": "Art Frame",
+        "func": process_art_frame,
+        "manufacturer_id": 2409,
+    },
+    b"\x01\x11>\x10": {
+        "modelName": SwitchbotModel.ART_FRAME,
+        "modelFriendlyName": "Art Frame",
+        "func": process_art_frame,
+        "manufacturer_id": 2409,
+    },
 }
 
 _SWITCHBOT_MODEL_TO_CHAR: defaultdict[SwitchbotModel, list[str | bytes]] = defaultdict(

+ 35 - 0
switchbot/adv_parsers/art_frame.py

@@ -0,0 +1,35 @@
+"""Art Frame advertisement data parser."""
+
+import logging
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def process_art_frame(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, bool | int | str]:
+    """Process Art Frame data."""
+    if mfr_data is None:
+        return {}
+
+    _seq_num = mfr_data[6]
+    battery_charging = bool(mfr_data[7] & 0x80)
+    battery = mfr_data[7] & 0x7F
+    image_index = mfr_data[8]
+    display_size = (mfr_data[9] >> 4) & 0x0F
+    display_mode = (mfr_data[9] >> 3) & 0x01
+    last_network_status = (mfr_data[9] >> 2) & 0x01
+
+    result = {
+        "sequence_number": _seq_num,
+        "battery_charging": battery_charging,
+        "battery": battery,
+        "image_index": image_index,
+        "display_size": display_size,
+        "display_mode": display_mode,
+        "last_network_status": last_network_status,
+    }
+
+    _LOGGER.debug("Art Frame mfr data: %s, result: %s", mfr_data.hex(), result)
+
+    return result

+ 1 - 0
switchbot/const/__init__.py

@@ -102,6 +102,7 @@ class SwitchbotModel(StrEnum):
     SMART_THERMOSTAT_RADIATOR = "Smart Thermostat Radiator"
     S20_VACUUM = "S20 Vacuum"
     PRESENCE_SENSOR = "Presence Sensor"
+    ART_FRAME = "Art Frame"
 
 
 __all__ = [

+ 140 - 0
switchbot/devices/art_frame.py

@@ -0,0 +1,140 @@
+"""Device handler for the Art Frame."""
+
+import logging
+from typing import Any
+
+from bleak.backends.device import BLEDevice
+
+from ..const import SwitchbotModel
+from .device import (
+    SwitchbotEncryptedDevice,
+    SwitchbotSequenceDevice,
+    update_after_operation,
+)
+
+_LOGGER = logging.getLogger(__name__)
+
+COMMAND_SET_IMAGE = "570F7A02{}"
+
+
+class SwitchbotArtFrame(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
+    """Representation of a Switchbot Art Frame."""
+
+    def __init__(
+        self,
+        device: BLEDevice,
+        key_id: str,
+        encryption_key: str,
+        interface: int = 0,
+        model: SwitchbotModel = SwitchbotModel.ART_FRAME,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
+        self.response_flag = True
+
+    @classmethod
+    async def verify_encryption_key(
+        cls,
+        device: BLEDevice,
+        key_id: str,
+        encryption_key: str,
+        model: SwitchbotModel = SwitchbotModel.ART_FRAME,
+        **kwargs: Any,
+    ) -> bool:
+        return await super().verify_encryption_key(
+            device, key_id, encryption_key, model, **kwargs
+        )
+
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+        if not (_data := await self._get_basic_info()):
+            return None
+        _LOGGER.debug("basic info data: %s", _data.hex())
+
+        battery_charging = bool(_data[1] & 0x80)
+        battery = _data[1] & 0x7F
+        firmware = _data[2] / 10.0
+        hardware = _data[3]
+        display_size = (_data[4] >> 4) & 0x0F
+        display_mode = (_data[4] >> 3) & 0x01
+        last_network_status = (_data[4] >> 2) & 0x01
+        current_image_index = _data[5]
+        total_num_of_images = _data[6]
+        all_images_index = [_data[x] for x in range(7, 7 + total_num_of_images)]
+
+        basic_info = {
+            "battery_charging": battery_charging,
+            "battery": battery,
+            "firmware": firmware,
+            "hardware": hardware,
+            "display_size": display_size,
+            "display_mode": display_mode,
+            "last_network_status": last_network_status,
+            "current_image_index": current_image_index,
+            "total_num_of_images": total_num_of_images,
+            "all_images_index": all_images_index,
+        }
+        _LOGGER.debug("Art Frame %s basic info: %s", self._device.address, basic_info)
+        return basic_info
+
+    def _select_image_index(self, offset: int) -> int:
+        """Select the image index based on the current index and offset."""
+        current_index = self.get_current_image_index()
+        all_images_index = self.get_all_images_index()
+
+        if not all_images_index or len(all_images_index) <= 1:
+            raise RuntimeError("No images available to select from.")
+
+        new_position = (all_images_index.index(current_index) + offset) % len(
+            all_images_index
+        )
+        return all_images_index[new_position]
+
+    async def _get_current_image_index(self) -> None:
+        """Validate the current image index."""
+        if not await self.get_basic_info():
+            raise RuntimeError("Failed to retrieve basic info for current image index.")
+
+    @update_after_operation
+    async def next_image(self) -> bool:
+        """Display the next image."""
+        await self._get_current_image_index()
+        idx = self._select_image_index(1)
+        result = await self._send_command(COMMAND_SET_IMAGE.format(f"{idx:02X}"))
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
+    async def prev_image(self) -> bool:
+        """Display the previous image."""
+        await self._get_current_image_index()
+        idx = self._select_image_index(-1)
+        result = await self._send_command(COMMAND_SET_IMAGE.format(f"{idx:02X}"))
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
+    async def set_image(self, index: int) -> bool:
+        """Set the image by index."""
+        await self._get_current_image_index()
+        total_images = self.get_total_images()
+
+        if index < 0 or index >= total_images:
+            raise ValueError(
+                f"Image index {index} is out of range. Total images: {total_images}."
+            )
+
+        all_images_index = self.get_all_images_index()
+        img_index = all_images_index[index]
+        result = await self._send_command(COMMAND_SET_IMAGE.format(f"{img_index:02X}"))
+        return self._check_command_result(result, 0, {1})
+
+    def get_all_images_index(self) -> list[int] | None:
+        """Return cached list of all image indexes."""
+        return self._get_adv_value("all_images_index")
+
+    def get_current_image_index(self) -> int | None:
+        """Return cached current image index."""
+        return self._get_adv_value("current_image_index")
+
+    def get_total_images(self) -> int | None:
+        """Return cached total number of images."""
+        return self._get_adv_value("total_num_of_images")

+ 1 - 0
switchbot/devices/device.py

@@ -104,6 +104,7 @@ API_MODEL_TO_ENUM: dict[str, SwitchbotModel] = {
     "W1104000": SwitchbotModel.PLUG_MINI_EU,
     "W1128000": SwitchbotModel.SMART_THERMOSTAT_RADIATOR,
     "W1111000": SwitchbotModel.CLIMATE_PANEL,
+    "W1130000": SwitchbotModel.ART_FRAME,
 }
 
 REQ_HEADER = "570f"

+ 18 - 0
tests/__init__.py

@@ -102,3 +102,21 @@ SMART_THERMOSTAT_RADIATOR_INFO = AdvTestCase(
     "Smart Thermostat Radiator",
     SwitchbotModel.SMART_THERMOSTAT_RADIATOR,
 )
+
+
+ART_FRAME_INFO = AdvTestCase(
+    b"\xb0\xe9\xfe\xe2\xfa8\x157\x03\x08",
+    b"\x00\x007\x01\x11>\x10",
+    {
+        "battery": 55,
+        "battery_charging": False,
+        "display_mode": 1,
+        "display_size": 0,
+        "image_index": 3,
+        "last_network_status": 0,
+        "sequence_number": 21,
+    },
+    b"\x01\x11>\x10",
+    "Art Frame",
+    SwitchbotModel.ART_FRAME,
+)

+ 40 - 0
tests/test_adv_parser.py

@@ -3532,6 +3532,22 @@ def test_humidifer_with_empty_data() -> None:
             "Meter Pro",
             SwitchbotModel.METER_PRO,
         ),
+        AdvTestCase(
+            b"\xb0\xe9\xfe\xe2\xfa8\x157\x03\x08",
+            b"\x00\x007\x01\x11>\x10",
+            {
+                "battery": 55,
+                "battery_charging": False,
+                "display_mode": 1,
+                "display_size": 0,
+                "image_index": 3,
+                "last_network_status": 0,
+                "sequence_number": 21,
+            },
+            b"\x01\x11>\x10",
+            "Art Frame",
+            SwitchbotModel.ART_FRAME,
+        ),
     ],
 )
 def test_adv_active(test_case: AdvTestCase) -> None:
@@ -3815,6 +3831,22 @@ def test_adv_active(test_case: AdvTestCase) -> None:
             "Presence Sensor",
             SwitchbotModel.PRESENCE_SENSOR,
         ),
+        AdvTestCase(
+            b"\xb0\xe9\xfe\xe2\xfa8\x157\x03\x08",
+            None,
+            {
+                "battery": 55,
+                "battery_charging": False,
+                "display_mode": 1,
+                "display_size": 0,
+                "image_index": 3,
+                "last_network_status": 0,
+                "sequence_number": 21,
+            },
+            b"\x00\x11>\x10",
+            "Art Frame",
+            SwitchbotModel.ART_FRAME,
+        ),
     ],
 )
 def test_adv_passive(test_case: AdvTestCase) -> None:
@@ -4012,6 +4044,14 @@ def test_adv_passive(test_case: AdvTestCase) -> None:
             "Presence Sensor",
             SwitchbotModel.PRESENCE_SENSOR,
         ),
+        AdvTestCase(
+            None,
+            b"\x00\x007\x01\x11>\x10",
+            {},
+            b"\x01\x11>\x10",
+            "Art Frame",
+            SwitchbotModel.ART_FRAME,
+        ),
     ],
 )
 def test_adv_with_empty_data(test_case: AdvTestCase) -> None:

+ 221 - 0
tests/test_art_frame.py

@@ -0,0 +1,221 @@
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchBotAdvertisement
+from switchbot.devices.art_frame import COMMAND_SET_IMAGE, SwitchbotArtFrame
+from switchbot.devices.device import SwitchbotEncryptedDevice
+
+from . import ART_FRAME_INFO
+from .test_adv_parser import AdvTestCase, generate_ble_device
+
+
+def create_device_for_command_testing(
+    adv_info: AdvTestCase,
+    init_data: dict | None = None,
+) -> SwitchbotArtFrame:
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    device = SwitchbotArtFrame(
+        ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=adv_info.modelName
+    )
+    device.update_from_advertisement(
+        make_advertisement_data(ble_device, adv_info, init_data)
+    )
+    device._send_command = AsyncMock()
+    device._check_command_result = MagicMock()
+    device.update = AsyncMock()
+    return device
+
+
+def make_advertisement_data(
+    ble_device: BLEDevice, adv_info: AdvTestCase, init_data: dict | None = None
+) -> SwitchBotAdvertisement:
+    """Set advertisement data with defaults."""
+    if init_data is None:
+        init_data = {}
+
+    return SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": adv_info.service_data,
+            "data": adv_info.data | init_data,
+            "isEncrypted": False,
+            "model": adv_info.model,
+            "modelFriendlyName": adv_info.modelFriendlyName,
+            "modelName": adv_info.modelName,
+        }
+        | init_data,
+        device=ble_device,
+        rssi=-80,
+        active=True,
+    )
+
+
+@pytest.mark.asyncio
+async def test_get_basic_info_none() -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+    device._get_basic_info = AsyncMock(return_value=None)
+
+    assert await device.get_basic_info() is None
+
+    with pytest.raises(
+        RuntimeError, match=r"Failed to retrieve basic info for current image index."
+    ):
+        await device._get_current_image_index()
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("basic_info", "result"),
+    [
+        (
+            b"\x016\x07\x01\x00\x00\x04\x00\xde\x18\xa5\x00\x00\x00\x00\x00\x00",
+            [
+                False,
+                54,
+                0.7,
+                1,
+                0,
+                0,
+                0,
+                0,
+                4,
+                [0, 222, 24, 165],
+            ],
+        ),
+    ],
+)
+async def test_get_basic_info_parsing(
+    basic_info: str, result: list[bool | int | float | list[int]]
+) -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+    device._get_basic_info = AsyncMock(return_value=basic_info)
+
+    info = await device.get_basic_info()
+    assert info["battery_charging"] == result[0]
+    assert info["battery"] == result[1]
+    assert info["firmware"] == result[2]
+    assert info["hardware"] == result[3]
+    assert info["display_size"] == result[4]
+    assert info["display_mode"] == result[5]
+    assert info["last_network_status"] == result[6]
+    assert info["current_image_index"] == result[7]
+    assert info["total_num_of_images"] == result[8]
+    assert info["all_images_index"] == result[9]
+
+    device._update_parsed_data(info)
+    assert device.get_all_images_index() == result[9]
+    assert device.get_total_images() == result[8]
+    assert device.get_current_image_index() == result[7]
+
+
+@pytest.mark.asyncio
+async def test_select_image_with_single_image() -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+
+    with (
+        patch.object(device, "get_all_images_index", return_value=[1]),
+        pytest.raises(RuntimeError, match=r"No images available to select from."),
+    ):
+        device._select_image_index(1)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("current_index", "all_images_index", "expected_cmd"),
+    [
+        (100, [1, 100, 150], "150"),
+        (150, [1, 100, 150], "1"),
+        (1, [1, 100, 150], "100"),
+    ],
+)
+async def test_next_image(
+    current_index: int, all_images_index: list[int], expected_cmd: str
+) -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+
+    with (
+        patch.object(device, "get_current_image_index", return_value=current_index),
+        patch.object(device, "get_all_images_index", return_value=all_images_index),
+    ):
+        await device.next_image()
+        device._send_command.assert_awaited_with(
+            COMMAND_SET_IMAGE.format(f"{int(expected_cmd):02X}")
+        )
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("current_index", "all_images_index", "expected_cmd"),
+    [
+        (100, [1, 100, 150], "1"),
+        (150, [1, 100, 150], "100"),
+        (1, [1, 100, 150], "150"),
+    ],
+)
+async def test_prev_image(
+    current_index: int, all_images_index: list[int], expected_cmd: str
+) -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+
+    with (
+        patch.object(device, "get_current_image_index", return_value=current_index),
+        patch.object(device, "get_all_images_index", return_value=all_images_index),
+    ):
+        await device.prev_image()
+        device._send_command.assert_awaited_with(
+            COMMAND_SET_IMAGE.format(f"{int(expected_cmd):02X}")
+        )
+
+
+@pytest.mark.asyncio
+async def test_set_image_with_invalid_index() -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+
+    with (
+        patch.object(device, "get_total_images", return_value=3),
+        patch.object(device, "get_all_images_index", return_value=[1, 2, 3]),
+        pytest.raises(
+            ValueError, match=r"Image index 5 is out of range. Total images: 3."
+        ),
+    ):
+        await device.set_image(5)
+
+
+@pytest.mark.asyncio
+async def test_set_image_with_valid_index() -> None:
+    device = create_device_for_command_testing(ART_FRAME_INFO)
+
+    with (
+        patch.object(device, "get_total_images", return_value=3),
+        patch.object(device, "get_all_images_index", return_value=[10, 20, 30]),
+    ):
+        await device.set_image(1)
+        device._send_command.assert_awaited_with(COMMAND_SET_IMAGE.format("14"))
+
+
+@pytest.mark.asyncio
+@patch.object(SwitchbotEncryptedDevice, "verify_encryption_key", new_callable=AsyncMock)
+async def test_verify_encryption_key(mock_parent_verify: AsyncMock) -> None:
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    key_id = "ff"
+    encryption_key = "ffffffffffffffffffffffffffffffff"
+
+    mock_parent_verify.return_value = True
+
+    result = await SwitchbotArtFrame.verify_encryption_key(
+        device=ble_device,
+        key_id=key_id,
+        encryption_key=encryption_key,
+        model=ART_FRAME_INFO.modelName,
+    )
+
+    mock_parent_verify.assert_awaited_once_with(
+        ble_device,
+        key_id,
+        encryption_key,
+        ART_FRAME_INFO.modelName,
+    )
+
+    assert result is True