2 커밋 7894a85c85 ... 343f6ccecb

작성자 SHA1 메시지 날짜
  J. Nick Koston 343f6ccecb Remove unused code (#232) 10 달 전
  dcmeglio 857597be02 Refactored common curtain/tilt code to shared base class (#231) 10 달 전
9개의 변경된 파일809개의 추가작업 그리고 144개의 파일을 삭제
  1. 2 2
      switchbot/const.py
  2. 141 0
      switchbot/devices/base_cover.py
  3. 3 4
      switchbot/devices/base_light.py
  4. 42 13
      switchbot/devices/blind_tilt.py
  5. 24 105
      switchbot/devices/curtain.py
  6. 1 0
      tests/test_adv_parser.py
  7. 152 0
      tests/test_base_cover.py
  8. 240 0
      tests/test_blind_tilt.py
  9. 204 20
      tests/test_curtain.py

+ 2 - 2
switchbot/const.py

@@ -3,12 +3,12 @@ from __future__ import annotations
 
 from enum import Enum
 
+from .enum import StrEnum
+
 DEFAULT_RETRY_COUNT = 3
 DEFAULT_RETRY_TIMEOUT = 1
 DEFAULT_SCAN_TIMEOUT = 5
 
-from .enum import StrEnum
-
 
 class SwitchbotAuthenticationError(RuntimeError):
     """Raised when authentication fails.

+ 141 - 0
switchbot/devices/base_cover.py

@@ -0,0 +1,141 @@
+"""Library to handle connection with Switchbot."""
+from __future__ import annotations
+
+import logging
+from abc import abstractmethod
+from typing import Any
+
+from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
+
+# Cover keys
+COVER_COMMAND = "4501"
+
+# For second element of open and close arrs we should add two bytes i.e. ff00
+# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
+# * Only for curtains 3. For other models use ff
+# Second byte [00] is a command (00 - open, 64 - close)
+POSITION_KEYS = [
+    f"{REQ_HEADER}{COVER_COMMAND}0101",
+    f"{REQ_HEADER}{COVER_COMMAND}05",  # +speed
+]  # +actual_position
+STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]
+
+COVER_EXT_SUM_KEY = f"{REQ_HEADER}460401"
+COVER_EXT_ADV_KEY = f"{REQ_HEADER}460402"
+
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SwitchbotBaseCover(SwitchbotDevice):
+    """Representation of a Switchbot Cover devices for both curtains and tilt blinds."""
+
+    def __init__(self, reverse: bool, *args: Any, **kwargs: Any) -> None:
+        """Switchbot Cover device constructor."""
+
+        super().__init__(*args, **kwargs)
+        self._reverse = reverse
+        self._settings: dict[str, Any] = {}
+        self.ext_info_sum: dict[str, Any] = {}
+        self.ext_info_adv: dict[str, Any] = {}
+        self._is_opening: bool = False
+        self._is_closing: bool = False
+
+    async def _send_multiple_commands(self, keys: list[str]) -> bool:
+        """Send multiple commands to device.
+
+        Since we current have no way to tell which command the device
+        needs we send both.
+        """
+        final_result = False
+        for key in keys:
+            result = await self._send_command(key)
+            final_result |= self._check_command_result(result, 0, {1})
+        return final_result
+
+    @update_after_operation
+    async def stop(self) -> bool:
+        """Send stop command to device."""
+        return await self._send_multiple_commands(STOP_KEYS)
+
+    @update_after_operation
+    async def set_position(self, position: int, speed: int = 255) -> bool:
+        """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
+        position = (100 - position) if self._reverse else position
+        return await self._send_multiple_commands(
+            [
+                f"{POSITION_KEYS[0]}{position:02X}",
+                f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
+            ]
+        )
+
+    @abstractmethod
+    def get_position(self) -> Any:
+        """Return current device position."""
+
+    @abstractmethod
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+
+    @abstractmethod
+    async def get_extended_info_summary(self) -> dict[str, Any] | None:
+        """Get extended info for all devices in chain."""
+
+    async def get_extended_info_adv(self) -> dict[str, Any] | None:
+        """Get advance page info for device chain."""
+
+        _data = await self._send_command(key=COVER_EXT_ADV_KEY)
+        if not _data:
+            _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
+            return None
+
+        if _data in (b"\x07", b"\x00"):
+            _LOGGER.error("%s: Unsuccessful, please try again", self.name)
+            return None
+
+        _state_of_charge = [
+            "not_charging",
+            "charging_by_adapter",
+            "charging_by_solar",
+            "fully_charged",
+            "solar_not_charging",
+            "charging_error",
+        ]
+
+        self.ext_info_adv["device0"] = {
+            "battery": _data[1],
+            "firmware": _data[2] / 10.0,
+            "stateOfCharge": _state_of_charge[_data[3]],
+        }
+
+        # If grouped curtain device present.
+        if _data[4]:
+            self.ext_info_adv["device1"] = {
+                "battery": _data[4],
+                "firmware": _data[5] / 10.0,
+                "stateOfCharge": _state_of_charge[_data[6]],
+            }
+
+        return self.ext_info_adv
+
+    def get_light_level(self) -> Any:
+        """Return cached light level."""
+        # To get actual light level call update() first.
+        return self._get_adv_value("lightLevel")
+
+    def is_reversed(self) -> bool:
+        """Return True if curtain position is opposite from SB data."""
+        return self._reverse
+
+    def is_calibrated(self) -> Any:
+        """Return True curtain is calibrated."""
+        # To get actual light level call update() first.
+        return self._get_adv_value("calibration")
+
+    def is_opening(self) -> bool:
+        """Return True if the curtain is opening."""
+        return self._is_opening
+
+    def is_closing(self) -> bool:
+        """Return True if the curtain is closing."""
+        return self._is_closing

+ 3 - 4
switchbot/devices/base_light.py

@@ -1,16 +1,15 @@
 from __future__ import annotations
 
+import asyncio
 import logging
+import time
 from abc import abstractmethod
 from typing import Any
 
+from ..models import SwitchBotAdvertisement
 from .device import ColorMode, SwitchbotDevice
 
 _LOGGER = logging.getLogger(__name__)
-import asyncio
-import time
-
-from ..models import SwitchBotAdvertisement
 
 
 class SwitchbotBaseLight(SwitchbotDevice):

+ 42 - 13
switchbot/devices/blind_tilt.py

@@ -10,27 +10,27 @@ from switchbot.devices.device import (
     update_after_operation,
 )
 
-from .curtain import CURTAIN_EXT_SUM_KEY, SwitchbotCurtain
+from ..models import SwitchBotAdvertisement
+from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover
 
 _LOGGER = logging.getLogger(__name__)
 
 
-BLIND_COMMAND = "4501"
 OPEN_KEYS = [
-    f"{REQ_HEADER}{BLIND_COMMAND}010132",
-    f"{REQ_HEADER}{BLIND_COMMAND}05ff32",
+    f"{REQ_HEADER}{COVER_COMMAND}010132",
+    f"{REQ_HEADER}{COVER_COMMAND}05ff32",
 ]
 CLOSE_DOWN_KEYS = [
-    f"{REQ_HEADER}{BLIND_COMMAND}010100",
-    f"{REQ_HEADER}{BLIND_COMMAND}05ff00",
+    f"{REQ_HEADER}{COVER_COMMAND}010100",
+    f"{REQ_HEADER}{COVER_COMMAND}05ff00",
 ]
 CLOSE_UP_KEYS = [
-    f"{REQ_HEADER}{BLIND_COMMAND}010164",
-    f"{REQ_HEADER}{BLIND_COMMAND}05ff64",
+    f"{REQ_HEADER}{COVER_COMMAND}010164",
+    f"{REQ_HEADER}{COVER_COMMAND}05ff64",
 ]
 
 
-class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice):
+class SwitchbotBlindTilt(SwitchbotBaseCover, SwitchbotSequenceDevice):
     """Representation of a Switchbot Blind Tilt."""
 
     # The position of the blind is saved returned with 0 = closed down, 50 = open and 100 = closed up.
@@ -43,23 +43,52 @@ class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice):
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
         """Switchbot Blind Tilt/woBlindTilt constructor."""
-        super().__init__(*args, **kwargs)
-
         self._reverse: bool = kwargs.pop("reverse_mode", False)
+        super().__init__(self._reverse, *args, **kwargs)
+
+    def _set_parsed_data(
+        self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
+    ) -> None:
+        """Set data."""
+        in_motion = data["inMotion"]
+        previous_tilt = self._get_adv_value("tilt")
+        new_tilt = data["tilt"]
+        self._update_motion_direction(in_motion, previous_tilt, new_tilt)
+        super()._set_parsed_data(advertisement, data)
+
+    def _update_motion_direction(
+        self, in_motion: bool, previous_tilt: int | None, new_tilt: int
+    ) -> None:
+        """Update opening/closing status based on movement."""
+        if previous_tilt is None:
+            return
+        if in_motion is False:
+            self._is_closing = self._is_opening = False
+            return
+
+        if new_tilt != previous_tilt:
+            self._is_opening = new_tilt > previous_tilt
+            self._is_closing = new_tilt < previous_tilt
 
     @update_after_operation
     async def open(self) -> bool:
         """Send open command."""
+        self._is_opening = True
+        self._is_closing = False
         return await self._send_multiple_commands(OPEN_KEYS)
 
     @update_after_operation
     async def close_up(self) -> bool:
         """Send close up command."""
+        self._is_opening = False
+        self._is_closing = True
         return await self._send_multiple_commands(CLOSE_UP_KEYS)
 
     @update_after_operation
     async def close_down(self) -> bool:
         """Send close down command."""
+        self._is_opening = False
+        self._is_closing = True
         return await self._send_multiple_commands(CLOSE_DOWN_KEYS)
 
     # The aim of this is to close to the nearest endpoint.
@@ -114,8 +143,8 @@ class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice):
         }
 
     async def get_extended_info_summary(self) -> dict[str, Any] | None:
-        """Get basic info for all devices in chain."""
-        _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
+        """Get extended info for all devices in chain."""
+        _data = await self._send_command(key=COVER_EXT_SUM_KEY)
 
         if not _data:
             _LOGGER.error("%s: Unsuccessful, no result from device", self.name)

+ 24 - 105
switchbot/devices/curtain.py

@@ -4,40 +4,35 @@ from __future__ import annotations
 import logging
 from typing import Any
 
-from switchbot.models import SwitchBotAdvertisement
-
-from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
-
-# Curtain keys
-CURTAIN_COMMAND = "4501"
+from ..models import SwitchBotAdvertisement
+from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover
+from .device import REQ_HEADER, update_after_operation
 
 # For second element of open and close arrs we should add two bytes i.e. ff00
 # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
 # * Only for curtains 3. For other models use ff
 # Second byte [00] is a command (00 - open, 64 - close)
 OPEN_KEYS = [
-    f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
-    f"{REQ_HEADER}{CURTAIN_COMMAND}05",  # +speed + "00"
+    f"{REQ_HEADER}{COVER_COMMAND}010100",
+    f"{REQ_HEADER}{COVER_COMMAND}05",  # +speed + "00"
 ]
 CLOSE_KEYS = [
-    f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
-    f"{REQ_HEADER}{CURTAIN_COMMAND}05",  # +speed + "64"
+    f"{REQ_HEADER}{COVER_COMMAND}010164",
+    f"{REQ_HEADER}{COVER_COMMAND}05",  # +speed + "64"
 ]
 POSITION_KEYS = [
-    f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
-    f"{REQ_HEADER}{CURTAIN_COMMAND}05",  # +speed
+    f"{REQ_HEADER}{COVER_COMMAND}0101",
+    f"{REQ_HEADER}{COVER_COMMAND}05",  # +speed
 ]  # +actual_position
-STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
+STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]
 
-CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
-CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
 CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
 
 
 _LOGGER = logging.getLogger(__name__)
 
 
-class SwitchbotCurtain(SwitchbotDevice):
+class SwitchbotCurtain(SwitchbotBaseCover):
     """Representation of a Switchbot Curtain."""
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
@@ -51,13 +46,11 @@ class SwitchbotCurtain(SwitchbotDevice):
         # and position = 100 equals open. The parameter is default set to True so that
         # the definition of position is the same as in Home Assistant.
 
-        super().__init__(*args, **kwargs)
         self._reverse: bool = kwargs.pop("reverse_mode", True)
+        super().__init__(self._reverse, *args, **kwargs)
         self._settings: dict[str, Any] = {}
         self.ext_info_sum: dict[str, Any] = {}
         self.ext_info_adv: dict[str, Any] = {}
-        self._is_opening: bool = False
-        self._is_closing: bool = False
 
     def _set_parsed_data(
         self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
@@ -69,18 +62,6 @@ class SwitchbotCurtain(SwitchbotDevice):
         self._update_motion_direction(in_motion, previous_position, new_position)
         super()._set_parsed_data(advertisement, data)
 
-    async def _send_multiple_commands(self, keys: list[str]) -> bool:
-        """Send multiple commands to device.
-
-        Since we current have no way to tell which command the device
-        needs we send both.
-        """
-        final_result = False
-        for key in keys:
-            result = await self._send_command(key)
-            final_result |= self._check_command_result(result, 0, {1})
-        return final_result
-
     @update_after_operation
     async def open(self, speed: int = 255) -> bool:
         """Send open command. Speed 255 - normal, 1 - slow"""
@@ -103,19 +84,16 @@ class SwitchbotCurtain(SwitchbotDevice):
     async def stop(self) -> bool:
         """Send stop command to device."""
         self._is_opening = self._is_closing = False
-        return await self._send_multiple_commands(STOP_KEYS)
+        return await super().stop()
 
     @update_after_operation
     async def set_position(self, position: int, speed: int = 255) -> bool:
         """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
-        position = (100 - position) if self._reverse else position
-        self._update_motion_direction(True, self._get_adv_value("position"), position)
-        return await self._send_multiple_commands(
-            [
-                f"{POSITION_KEYS[0]}{position:02X}",
-                f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
-            ]
+        direction_adjusted_position = (100 - position) if self._reverse else position
+        self._update_motion_direction(
+            True, self._get_adv_value("position"), direction_adjusted_position
         )
+        return await super().set_position(position, speed)
 
     def get_position(self) -> Any:
         """Return cached position (0-100) of Curtain."""
@@ -168,8 +146,8 @@ class SwitchbotCurtain(SwitchbotDevice):
             self._is_closing = new_position < previous_position
 
     async def get_extended_info_summary(self) -> dict[str, Any] | None:
-        """Get basic info for all devices in chain."""
-        _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
+        """Get extended info for all devices in chain."""
+        _data = await self._send_command(key=COVER_EXT_SUM_KEY)
 
         if not _data:
             _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
@@ -184,78 +162,19 @@ class SwitchbotCurtain(SwitchbotDevice):
             "touchToOpen": bool(_data[1] & 0b01000000),
             "light": bool(_data[1] & 0b00100000),
             "openDirection": (
-                "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left"
+                "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
             ),
         }
 
         # if grouped curtain device present.
         if _data[2] != 0:
             self.ext_info_sum["device1"] = {
-                "openDirectionDefault": not bool(_data[1] & 0b10000000),
-                "touchToOpen": bool(_data[1] & 0b01000000),
-                "light": bool(_data[1] & 0b00100000),
+                "openDirectionDefault": not bool(_data[2] & 0b10000000),
+                "touchToOpen": bool(_data[2] & 0b01000000),
+                "light": bool(_data[2] & 0b00100000),
                 "openDirection": (
-                    "left_to_right" if _data[1] & 0b00010000 else "right_to_left"
+                    "left_to_right" if _data[2] & 0b00010000 else "right_to_left"
                 ),
             }
 
         return self.ext_info_sum
-
-    async def get_extended_info_adv(self) -> dict[str, Any] | None:
-        """Get advance page info for device chain."""
-
-        _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
-        if not _data:
-            _LOGGER.error("%s: Unsuccessful, no result from device", self.name)
-            return None
-
-        if _data in (b"\x07", b"\x00"):
-            _LOGGER.error("%s: Unsuccessful, please try again", self.name)
-            return None
-
-        _state_of_charge = [
-            "not_charging",
-            "charging_by_adapter",
-            "charging_by_solar",
-            "fully_charged",
-            "solar_not_charging",
-            "charging_error",
-        ]
-
-        self.ext_info_adv["device0"] = {
-            "battery": _data[1],
-            "firmware": _data[2] / 10.0,
-            "stateOfCharge": _state_of_charge[_data[3]],
-        }
-
-        # If grouped curtain device present.
-        if _data[4]:
-            self.ext_info_adv["device1"] = {
-                "battery": _data[4],
-                "firmware": _data[5] / 10.0,
-                "stateOfCharge": _state_of_charge[_data[6]],
-            }
-
-        return self.ext_info_adv
-
-    def get_light_level(self) -> Any:
-        """Return cached light level."""
-        # To get actual light level call update() first.
-        return self._get_adv_value("lightLevel")
-
-    def is_reversed(self) -> bool:
-        """Return True if curtain position is opposite from SB data."""
-        return self._reverse
-
-    def is_calibrated(self) -> Any:
-        """Return True curtain is calibrated."""
-        # To get actual light level call update() first.
-        return self._get_adv_value("calibration")
-
-    def is_opening(self) -> bool:
-        """Return True if the curtain is opening."""
-        return self._is_opening
-
-    def is_closing(self) -> bool:
-        """Return True if the curtain is closing."""
-        return self._is_closing

+ 1 - 0
tests/test_adv_parser.py

@@ -963,6 +963,7 @@ def test_wosensor_active_zero_data():
         active=True,
     )
 
+
 def test_woiosensor_passive_and_active():
     """Test parsing woiosensor as passive with active data as well."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")

+ 152 - 0
tests/test_base_cover.py

@@ -0,0 +1,152 @@
+from unittest.mock import AsyncMock, Mock
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchBotAdvertisement, SwitchbotModel
+from switchbot.devices import base_cover, blind_tilt
+
+from .test_adv_parser import generate_ble_device
+
+
+def create_device_for_command_testing(position=50, calibration=True):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device)
+    base_cover_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, position, calibration)
+    )
+    base_cover_device._send_multiple_commands = AsyncMock()
+    base_cover_device.update = AsyncMock()
+    return base_cover_device
+
+
+def make_advertisement_data(
+    ble_device: BLEDevice, in_motion: bool, position: int, calibration: bool = True
+):
+    """Set advertisement data with defaults."""
+
+    return SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"c\xc0X\x00\x11\x04",
+            "data": {
+                "calibration": calibration,
+                "battery": 88,
+                "inMotion": in_motion,
+                "tilt": position,
+                "lightLevel": 1,
+                "deviceChain": 1,
+            },
+            "isEncrypted": False,
+            "model": "c",
+            "modelFriendlyName": "Curtain",
+            "modelName": SwitchbotModel.CURTAIN,
+        },
+        device=ble_device,
+        rssi=-80,
+        active=True,
+    )
+
+
+@pytest.mark.asyncio
+async def test_send_multiple_commands():
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device)
+    base_cover_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 50, True)
+    )
+    base_cover_device._send_command = AsyncMock()
+    base_cover_device._check_command_result = Mock(return_value=True)
+    await base_cover_device._send_multiple_commands(blind_tilt.OPEN_KEYS)
+    assert base_cover_device._send_command.await_count == 2
+
+
+@pytest.mark.asyncio
+async def test_stop():
+    base_cover_device = create_device_for_command_testing()
+    await base_cover_device.stop()
+    base_cover_device._send_multiple_commands.assert_awaited_once_with(
+        base_cover.STOP_KEYS
+    )
+
+
+@pytest.mark.asyncio
+async def test_set_position():
+    base_cover_device = create_device_for_command_testing()
+    await base_cover_device.set_position(50)
+    base_cover_device._send_multiple_commands.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("data_value", [(None), (b"\x07"), (b"\x00")])
+async def test_get_extended_info_adv_returns_none_when_bad_data(data_value):
+    base_cover_device = create_device_for_command_testing()
+    base_cover_device._send_command = AsyncMock(return_value=data_value)
+    assert await base_cover_device.get_extended_info_adv() is None
+
+
+@pytest.mark.asyncio
+async def test_get_extended_info_adv_returns_single_device():
+    base_cover_device = create_device_for_command_testing()
+    base_cover_device._send_command = AsyncMock(
+        return_value=bytes([0, 50, 20, 0, 0, 0, 0])
+    )
+    ext_result = await base_cover_device.get_extended_info_adv()
+    assert ext_result["device0"]["battery"] == 50
+    assert ext_result["device0"]["firmware"] == 2
+    assert "device1" not in ext_result
+
+
+@pytest.mark.asyncio
+async def test_get_extended_info_adv_returns_both_devices():
+    base_cover_device = create_device_for_command_testing()
+    base_cover_device._send_command = AsyncMock(
+        return_value=bytes([0, 50, 20, 0, 10, 30, 0])
+    )
+    ext_result = await base_cover_device.get_extended_info_adv()
+    assert ext_result["device0"]["battery"] == 50
+    assert ext_result["device0"]["firmware"] == 2
+    assert ext_result["device1"]["battery"] == 10
+    assert ext_result["device1"]["firmware"] == 3
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "data_value,result",
+    [
+        (0, "not_charging"),
+        (1, "charging_by_adapter"),
+        (2, "charging_by_solar"),
+        (3, "fully_charged"),
+        (4, "solar_not_charging"),
+        (5, "charging_error"),
+    ],
+)
+async def test_get_extended_info_adv_returns_device0_charge_states(data_value, result):
+    base_cover_device = create_device_for_command_testing()
+    base_cover_device._send_command = AsyncMock(
+        return_value=bytes([0, 50, 20, data_value, 10, 30, 0])
+    )
+    ext_result = await base_cover_device.get_extended_info_adv()
+    assert ext_result["device0"]["stateOfCharge"] == result
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "data_value,result",
+    [
+        (0, "not_charging"),
+        (1, "charging_by_adapter"),
+        (2, "charging_by_solar"),
+        (3, "fully_charged"),
+        (4, "solar_not_charging"),
+        (5, "charging_error"),
+    ],
+)
+async def test_get_extended_info_adv_returns_device1_charge_states(data_value, result):
+    base_cover_device = create_device_for_command_testing()
+    base_cover_device._send_command = AsyncMock(
+        return_value=bytes([0, 50, 20, 0, 10, 30, data_value])
+    )
+    ext_result = await base_cover_device.get_extended_info_adv()
+    assert ext_result["device1"]["stateOfCharge"] == result

+ 240 - 0
tests/test_blind_tilt.py

@@ -0,0 +1,240 @@
+from unittest.mock import AsyncMock
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchBotAdvertisement, SwitchbotModel
+from switchbot.devices import blind_tilt
+from switchbot.devices.base_cover import COVER_EXT_SUM_KEY
+
+from .test_adv_parser import generate_ble_device
+
+
+def create_device_for_command_testing(
+    position=50, calibration=True, reverse_mode=False
+):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = blind_tilt.SwitchbotBlindTilt(
+        ble_device, reverse_mode=reverse_mode
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, position, calibration)
+    )
+    curtain_device._send_multiple_commands = AsyncMock()
+    curtain_device.update = AsyncMock()
+    return curtain_device
+
+
+def make_advertisement_data(
+    ble_device: BLEDevice, in_motion: bool, position: int, calibration: bool = True
+):
+    """Set advertisement data with defaults."""
+
+    return SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"c\xc0X\x00\x11\x04",
+            "data": {
+                "calibration": calibration,
+                "battery": 88,
+                "inMotion": in_motion,
+                "tilt": position,
+                "lightLevel": 1,
+                "deviceChain": 1,
+            },
+            "isEncrypted": False,
+            "model": "c",
+            "modelFriendlyName": "Curtain",
+            "modelName": SwitchbotModel.CURTAIN,
+        },
+        device=ble_device,
+        rssi=-80,
+        active=True,
+    )
+
+
+@pytest.mark.asyncio
+async def test_open():
+    blind_device = create_device_for_command_testing()
+    await blind_device.open()
+    blind_device._send_multiple_commands.assert_awaited_once_with(blind_tilt.OPEN_KEYS)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "position,keys", [(5, blind_tilt.CLOSE_DOWN_KEYS), (55, blind_tilt.CLOSE_UP_KEYS)]
+)
+async def test_close(position, keys):
+    blind_device = create_device_for_command_testing(position=position)
+    await blind_device.close()
+    blind_device._send_multiple_commands.assert_awaited_once_with(keys)
+
+
+@pytest.mark.asyncio
+async def test_get_basic_info_returns_none_when_no_data():
+    blind_device = create_device_for_command_testing()
+    blind_device._get_basic_info = AsyncMock(return_value=None)
+
+    assert await blind_device.get_basic_info() is None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "reverse_mode,data,result",
+    [
+        (
+            False,
+            bytes([0, 1, 10, 2, 255, 255, 50, 4]),
+            [1, 1, 1, 1, 1, True, False, False, True, 50, 4],
+        ),
+        (
+            False,
+            bytes([0, 1, 10, 2, 0, 0, 50, 4]),
+            [1, 1, 0, 0, 0, False, False, False, False, 50, 4],
+        ),
+        (
+            False,
+            bytes([0, 1, 10, 2, 0, 1, 50, 4]),
+            [1, 1, 0, 0, 1, False, True, False, True, 50, 4],
+        ),
+        (
+            True,
+            bytes([0, 1, 10, 2, 255, 255, 50, 4]),
+            [1, 1, 1, 1, 1, True, False, True, False, 50, 4],
+        ),
+        (
+            True,
+            bytes([0, 1, 10, 2, 0, 0, 50, 4]),
+            [1, 1, 0, 0, 0, False, False, False, False, 50, 4],
+        ),
+        (
+            True,
+            bytes([0, 1, 10, 2, 0, 1, 50, 4]),
+            [1, 1, 0, 0, 1, False, True, False, True, 50, 4],
+        ),
+    ],
+)
+async def test_get_basic_info(reverse_mode, data, result):
+    blind_device = create_device_for_command_testing(reverse_mode=reverse_mode)
+    blind_device._get_basic_info = AsyncMock(return_value=data)
+
+    info = await blind_device.get_basic_info()
+    assert info["battery"] == result[0]
+    assert info["firmware"] == result[1]
+    assert info["light"] == result[2]
+    assert info["fault"] == result[2]
+    assert info["solarPanel"] == result[3]
+    assert info["calibration"] == result[3]
+    assert info["calibrated"] == result[3]
+    assert info["inMotion"] == result[4]
+    assert info["motionDirection"]["opening"] == result[5]
+    assert info["motionDirection"]["closing"] == result[6]
+    assert info["motionDirection"]["up"] == result[7]
+    assert info["motionDirection"]["down"] == result[8]
+    assert info["tilt"] == result[9]
+    assert info["timers"] == result[10]
+
+
+@pytest.mark.asyncio
+async def test_get_extended_info_summary_sends_command():
+    blind_device = create_device_for_command_testing()
+    blind_device._send_command = AsyncMock()
+    await blind_device.get_extended_info_summary()
+    blind_device._send_command.assert_awaited_once_with(key=COVER_EXT_SUM_KEY)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("data_value", [(None), (b"\x07"), (b"\x00")])
+async def test_get_extended_info_summary_returns_none_when_bad_data(data_value):
+    blind_device = create_device_for_command_testing()
+    blind_device._send_command = AsyncMock(return_value=data_value)
+    assert await blind_device.get_extended_info_summary() is None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "data,result", [(bytes([0, 0]), False), (bytes([0, 255]), True)]
+)
+async def test_get_extended_info_summary(data, result):
+    blind_device = create_device_for_command_testing()
+    blind_device._send_command = AsyncMock(return_value=data)
+    ext_result = await blind_device.get_extended_info_summary()
+    assert ext_result["device0"]["light"] == result
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_opening(reverse_mode):
+    """Test passive opening advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = blind_tilt.SwitchbotBlindTilt(
+        ble_device, reverse_mode=reverse_mode
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 0)
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 10)
+    )
+
+    assert curtain_device.is_opening() is True
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_closing(reverse_mode):
+    """Test passive closing advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = blind_tilt.SwitchbotBlindTilt(
+        ble_device, reverse_mode=reverse_mode
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 100)
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 90)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is True
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_opening_then_stop(reverse_mode):
+    """Test passive stopped after opening advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = blind_tilt.SwitchbotBlindTilt(
+        ble_device, reverse_mode=reverse_mode
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 0)
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 10)
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, False, 10)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_closing_then_stop(reverse_mode):
+    """Test passive stopped after closing advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = blind_tilt.SwitchbotBlindTilt(
+        ble_device, reverse_mode=reverse_mode
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 100)
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 90)
+    )
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, False, 90)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False

+ 204 - 20
tests/test_curtain.py

@@ -1,16 +1,29 @@
-from typing import Any
-from unittest.mock import Mock
+from unittest.mock import AsyncMock, Mock
 
 import pytest
 from bleak.backends.device import BLEDevice
 
 from switchbot import SwitchBotAdvertisement, SwitchbotModel
 from switchbot.devices import curtain
+from switchbot.devices.base_cover import COVER_EXT_SUM_KEY
 
 from .test_adv_parser import generate_ble_device
 
 
-def set_advertisement_data(ble_device: BLEDevice, in_motion: bool, position: int):
+def create_device_for_command_testing(calibration=True, reverse_mode=False):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 50, calibration)
+    )
+    curtain_device._send_multiple_commands = AsyncMock()
+    curtain_device.update = AsyncMock()
+    return curtain_device
+
+
+def make_advertisement_data(
+    ble_device: BLEDevice, in_motion: bool, position: int, calibration: bool = True
+):
     """Set advertisement data with defaults."""
 
     return SwitchBotAdvertisement(
@@ -18,7 +31,7 @@ def set_advertisement_data(ble_device: BLEDevice, in_motion: bool, position: int
         data={
             "rawAdvData": b"c\xc0X\x00\x11\x04",
             "data": {
-                "calibration": True,
+                "calibration": calibration,
                 "battery": 88,
                 "inMotion": in_motion,
                 "position": position,
@@ -42,7 +55,7 @@ def test_device_passive_not_in_motion(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, False, 0)
+        make_advertisement_data(ble_device, False, 0)
     )
 
     assert curtain_device.is_opening() is False
@@ -55,10 +68,10 @@ def test_device_passive_opening(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 0)
+        make_advertisement_data(ble_device, True, 0)
     )
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 10)
+        make_advertisement_data(ble_device, True, 10)
     )
 
     assert curtain_device.is_opening() is True
@@ -71,10 +84,10 @@ def test_device_passive_closing(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 100)
+        make_advertisement_data(ble_device, True, 100)
     )
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 90)
+        make_advertisement_data(ble_device, True, 90)
     )
 
     assert curtain_device.is_opening() is False
@@ -87,13 +100,13 @@ def test_device_passive_opening_then_stop(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 0)
+        make_advertisement_data(ble_device, True, 0)
     )
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 10)
+        make_advertisement_data(ble_device, True, 10)
     )
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, False, 10)
+        make_advertisement_data(ble_device, False, 10)
     )
 
     assert curtain_device.is_opening() is False
@@ -106,13 +119,13 @@ def test_device_passive_closing_then_stop(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 100)
+        make_advertisement_data(ble_device, True, 100)
     )
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 90)
+        make_advertisement_data(ble_device, True, 90)
     )
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, False, 90)
+        make_advertisement_data(ble_device, False, 90)
     )
 
     assert curtain_device.is_opening() is False
@@ -126,7 +139,7 @@ async def test_device_active_not_in_motion(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, False, 0)
+        make_advertisement_data(ble_device, False, 0)
     )
 
     basic_info = bytes([0, 0, 0, 0, 0, 0, 100, 0])
@@ -149,7 +162,7 @@ async def test_device_active_opening(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 0)
+        make_advertisement_data(ble_device, True, 0)
     )
 
     basic_info = bytes([0, 0, 0, 0, 0, 67, 10, 0])
@@ -172,7 +185,7 @@ async def test_device_active_closing(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 100)
+        make_advertisement_data(ble_device, True, 100)
     )
 
     basic_info = bytes([0, 0, 0, 0, 0, 67, 90, 0])
@@ -195,7 +208,7 @@ async def test_device_active_opening_then_stop(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 0)
+        make_advertisement_data(ble_device, True, 0)
     )
 
     basic_info = bytes([0, 0, 0, 0, 0, 67, 10, 0])
@@ -222,7 +235,7 @@ async def test_device_active_closing_then_stop(reverse_mode):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
     curtain_device.update_from_advertisement(
-        set_advertisement_data(ble_device, True, 100)
+        make_advertisement_data(ble_device, True, 100)
     )
 
     basic_info = bytes([0, 0, 0, 0, 0, 67, 90, 0])
@@ -240,3 +253,174 @@ async def test_device_active_closing_then_stop(reverse_mode):
 
     assert curtain_device.is_opening() is False
     assert curtain_device.is_closing() is False
+
+
+@pytest.mark.asyncio
+async def test_get_basic_info_returns_none_when_no_data():
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device)
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 0)
+    )
+    curtain_device._get_basic_info = AsyncMock(return_value=None)
+
+    assert await curtain_device.get_basic_info() is None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "data,result",
+    [
+        (
+            bytes([0, 1, 10, 2, 255, 255, 50, 4]),
+            [1, 1, 2, "right_to_left", 1, 1, 50, 4],
+        ),
+        (bytes([0, 1, 10, 2, 0, 0, 50, 4]), [1, 1, 2, "left_to_right", 0, 0, 50, 4]),
+    ],
+)
+async def test_get_basic_info(data, result):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device)
+    curtain_device.update_from_advertisement(
+        make_advertisement_data(ble_device, True, 0)
+    )
+
+    async def custom_implementation():
+        return data
+
+    curtain_device._get_basic_info = Mock(side_effect=custom_implementation)
+
+    info = await curtain_device.get_basic_info()
+    assert info["battery"] == result[0]
+    assert info["firmware"] == result[1]
+    assert info["chainLength"] == result[2]
+    assert info["openDirection"] == result[3]
+    assert info["touchToOpen"] == result[4]
+    assert info["light"] == result[4]
+    assert info["fault"] == result[4]
+    assert info["solarPanel"] == result[5]
+    assert info["calibration"] == result[5]
+    assert info["calibrated"] == result[5]
+    assert info["inMotion"] == result[5]
+    assert info["position"] == result[6]
+    assert info["timers"] == result[7]
+
+
+@pytest.mark.asyncio
+async def test_open():
+    curtain_device = create_device_for_command_testing()
+    await curtain_device.open()
+    assert curtain_device.is_opening() is True
+    assert curtain_device.is_closing() is False
+    curtain_device._send_multiple_commands.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+async def test_close():
+    curtain_device = create_device_for_command_testing()
+    await curtain_device.close()
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is True
+    curtain_device._send_multiple_commands.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+async def test_stop():
+    curtain_device = create_device_for_command_testing()
+    await curtain_device.stop()
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+    curtain_device._send_multiple_commands.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+async def test_set_position_opening():
+    curtain_device = create_device_for_command_testing()
+    await curtain_device.set_position(100)
+    assert curtain_device.is_opening() is True
+    assert curtain_device.is_closing() is False
+    curtain_device._send_multiple_commands.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+async def test_set_position_closing():
+    curtain_device = create_device_for_command_testing()
+    await curtain_device.set_position(0)
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is True
+    curtain_device._send_multiple_commands.assert_awaited_once()
+
+
+def test_get_position():
+    curtain_device = create_device_for_command_testing()
+    assert curtain_device.get_position() == 50
+
+
+@pytest.mark.asyncio
+async def test_get_extended_info_summary_sends_command():
+    curtain_device = create_device_for_command_testing()
+    curtain_device._send_command = AsyncMock()
+    await curtain_device.get_extended_info_summary()
+    curtain_device._send_command.assert_awaited_once_with(key=COVER_EXT_SUM_KEY)
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("data_value", [(None), (b"\x07"), (b"\x00")])
+async def test_get_extended_info_summary_returns_none_when_bad_data(data_value):
+    curtain_device = create_device_for_command_testing()
+    curtain_device._send_command = AsyncMock(return_value=data_value)
+    assert await curtain_device.get_extended_info_summary() is None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "data,result",
+    [
+        ([0, 0, 0], [True, False, False, "right_to_left"]),
+        ([255, 255, 0], [False, True, True, "left_to_right"]),
+    ],
+)
+async def test_get_extended_info_summary_returns_device0(data, result):
+    curtain_device = create_device_for_command_testing()
+    curtain_device._send_command = AsyncMock(return_value=bytes(data))
+    ext_result = await curtain_device.get_extended_info_summary()
+    assert ext_result["device0"]["openDirectionDefault"] == result[0]
+    assert ext_result["device0"]["touchToOpen"] == result[1]
+    assert ext_result["device0"]["light"] == result[2]
+    assert ext_result["device0"]["openDirection"] == result[3]
+    assert "device1" not in ext_result
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "data,result",
+    [
+        ([0, 0, 1], [True, False, False, "right_to_left"]),
+        ([255, 255, 255], [False, True, True, "left_to_right"]),
+    ],
+)
+async def test_get_extended_info_summary_returns_device1(data, result):
+    curtain_device = create_device_for_command_testing()
+    curtain_device._send_command = AsyncMock(return_value=bytes(data))
+    ext_result = await curtain_device.get_extended_info_summary()
+    assert ext_result["device1"]["openDirectionDefault"] == result[0]
+    assert ext_result["device1"]["touchToOpen"] == result[1]
+    assert ext_result["device1"]["light"] == result[2]
+    assert ext_result["device1"]["openDirection"] == result[3]
+
+
+def test_get_light_level():
+    curtain_device = create_device_for_command_testing()
+    assert curtain_device.get_light_level() == 1
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_is_reversed(reverse_mode):
+    curtain_device = create_device_for_command_testing(reverse_mode=reverse_mode)
+    assert curtain_device.is_reversed() == reverse_mode
+
+
+@pytest.mark.parametrize("calibration", [(True), (False)])
+def test_is_calibrated(calibration):
+    curtain_device = create_device_for_command_testing(calibration=calibration)
+    assert curtain_device.is_calibrated() == calibration