Browse Source

Add support for is_opening and is_closing for curtains (#226)

Co-authored-by: J. Nick Koston <nick@koston.org>
dcmeglio 3 months ago
parent
commit
fced7c7e30

+ 0 - 1
switchbot/devices/bulb.py

@@ -1,7 +1,6 @@
 from __future__ import annotations
 
 import logging
-from typing import Any
 
 from .base_light import SwitchbotSequenceBaseLight
 from .device import REQ_HEADER, ColorMode

+ 0 - 1
switchbot/devices/ceiling_light.py

@@ -1,7 +1,6 @@
 from __future__ import annotations
 
 import logging
-from typing import Any
 
 from .base_light import SwitchbotBaseLight
 from .device import REQ_HEADER, ColorMode

+ 51 - 2
switchbot/devices/curtain.py

@@ -4,6 +4,8 @@ from __future__ import annotations
 import logging
 from typing import Any
 
+from switchbot.models import SwitchBotAdvertisement
+
 from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
 
 # Curtain keys
@@ -54,6 +56,18 @@ class SwitchbotCurtain(SwitchbotDevice):
         self._settings: dict[str, Any] = {}
         self.ext_info_sum: dict[str, Any] = {}
         self.ext_info_adv: dict[str, Any] = {}
+        self._is_opening: bool = False
+        self._is_closing: bool = False
+
+    def _set_parsed_data(
+        self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
+    ) -> None:
+        """Set data."""
+        in_motion = data["inMotion"]
+        previous_position = self._get_adv_value("position")
+        new_position = data["position"]
+        self._update_motion_direction(in_motion, previous_position, new_position)
+        super()._set_parsed_data(advertisement, data)
 
     async def _send_multiple_commands(self, keys: list[str]) -> bool:
         """Send multiple commands to device.
@@ -70,6 +84,8 @@ class SwitchbotCurtain(SwitchbotDevice):
     @update_after_operation
     async def open(self, speed: int = 255) -> bool:
         """Send open command. Speed 255 - normal, 1 - slow"""
+        self._is_opening = True
+        self._is_closing = False
         return await self._send_multiple_commands(
             [OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
         )
@@ -77,6 +93,8 @@ class SwitchbotCurtain(SwitchbotDevice):
     @update_after_operation
     async def close(self, speed: int = 255) -> bool:
         """Send close command. Speed 255 - normal, 1 - slow"""
+        self._is_closing = True
+        self._is_opening = False
         return await self._send_multiple_commands(
             [CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
         )
@@ -84,12 +102,14 @@ class SwitchbotCurtain(SwitchbotDevice):
     @update_after_operation
     async def stop(self) -> bool:
         """Send stop command to device."""
+        self._is_opening = self._is_closing = False
         return await self._send_multiple_commands(STOP_KEYS)
 
     @update_after_operation
     async def set_position(self, position: int, speed: int = 255) -> bool:
         """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
         position = (100 - position) if self._reverse else position
+        self._update_motion_direction(True, self._get_adv_value("position"), position)
         return await self._send_multiple_commands(
             [
                 f"{POSITION_KEYS[0]}{position:02X}",
@@ -108,6 +128,13 @@ class SwitchbotCurtain(SwitchbotDevice):
             return None
 
         _position = max(min(_data[6], 100), 0)
+        _direction_adjusted_position = (100 - _position) if self._reverse else _position
+        _previous_position = self._get_adv_value("position")
+        _in_motion = bool(_data[5] & 0b01000011)
+        self._update_motion_direction(
+            _in_motion, _previous_position, _direction_adjusted_position
+        )
+
         return {
             "battery": _data[1],
             "firmware": _data[2] / 10.0,
@@ -121,11 +148,25 @@ class SwitchbotCurtain(SwitchbotDevice):
             "solarPanel": bool(_data[5] & 0b00001000),
             "calibration": bool(_data[5] & 0b00000100),
             "calibrated": bool(_data[5] & 0b00000100),
-            "inMotion": bool(_data[5] & 0b01000011),
-            "position": (100 - _position) if self._reverse else _position,
+            "inMotion": _in_motion,
+            "position": _direction_adjusted_position,
             "timers": _data[7],
         }
 
+    def _update_motion_direction(
+        self, in_motion: bool, previous_position: int | None, new_position: int
+    ) -> None:
+        """Update opening/closing status based on movement."""
+        if previous_position is None:
+            return
+        if in_motion is False:
+            self._is_closing = self._is_opening = False
+            return
+
+        if new_position != previous_position:
+            self._is_opening = new_position > previous_position
+            self._is_closing = new_position < previous_position
+
     async def get_extended_info_summary(self) -> dict[str, Any] | None:
         """Get basic info for all devices in chain."""
         _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
@@ -210,3 +251,11 @@ class SwitchbotCurtain(SwitchbotDevice):
         """Return True curtain is calibrated."""
         # To get actual light level call update() first.
         return self._get_adv_value("calibration")
+
+    def is_opening(self) -> bool:
+        """Return True if the curtain is opening."""
+        return self._is_opening
+
+    def is_closing(self) -> bool:
+        """Return True if the curtain is closing."""
+        return self._is_closing

+ 0 - 1
switchbot/devices/light_strip.py

@@ -1,7 +1,6 @@
 from __future__ import annotations
 
 import logging
-from typing import Any
 
 from .base_light import SwitchbotSequenceBaseLight
 from .device import REQ_HEADER, ColorMode

+ 7 - 2
tests/test_adv_parser.py

@@ -358,7 +358,9 @@ def test_parse_advertisement_data_curtain3():
     """Test parse_advertisement_data for curtain 3."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     adv_data = generate_advertisement_data(
-        manufacturer_data={2409: b"\xaa\xbb\xcc\xdd\xee\xff\xf7\x07\x00\x11\x04\x00\x49"},
+        manufacturer_data={
+            2409: b"\xaa\xbb\xcc\xdd\xee\xff\xf7\x07\x00\x11\x04\x00\x49"
+        },
         service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"{\xc0\x49\x00\x11\x04"},
         rssi=-80,
     )
@@ -391,7 +393,9 @@ def test_parse_advertisement_data_curtain3_passive():
     """Test parse_advertisement_data for curtain passive."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     adv_data = generate_advertisement_data(
-        manufacturer_data={2409: b"\xaa\xbb\xcc\xdd\xee\xff\xf7\x07\x00\x11\x04\x00\x49"},
+        manufacturer_data={
+            2409: b"\xaa\xbb\xcc\xdd\xee\xff\xf7\x07\x00\x11\x04\x00\x49"
+        },
         service_data={},
         rssi=-80,
     )
@@ -1364,6 +1368,7 @@ def test_parsing_lock_passive():
         active=False,
     )
 
+
 def test_parsing_lock_active_old_firmware():
     """Test parsing lock with active data. Old firmware."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")

+ 242 - 0
tests/test_curtain.py

@@ -0,0 +1,242 @@
+from typing import Any
+from unittest.mock import Mock
+
+import pytest
+from bleak.backends.device import BLEDevice
+
+from switchbot import SwitchBotAdvertisement, SwitchbotModel
+from switchbot.devices import curtain
+
+from .test_adv_parser import generate_ble_device
+
+
+def set_advertisement_data(ble_device: BLEDevice, in_motion: bool, position: int):
+    """Set advertisement data with defaults."""
+
+    return SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": b"c\xc0X\x00\x11\x04",
+            "data": {
+                "calibration": True,
+                "battery": 88,
+                "inMotion": in_motion,
+                "position": position,
+                "lightLevel": 1,
+                "deviceChain": 1,
+            },
+            "isEncrypted": False,
+            "model": "c",
+            "modelFriendlyName": "Curtain",
+            "modelName": SwitchbotModel.CURTAIN,
+        },
+        device=ble_device,
+        rssi=-80,
+        active=True,
+    )
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_not_in_motion(reverse_mode):
+    """Test passive not in motion advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, False, 0)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_opening(reverse_mode):
+    """Test passive opening advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 0)
+    )
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 10)
+    )
+
+    assert curtain_device.is_opening() is True
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_closing(reverse_mode):
+    """Test passive closing advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 100)
+    )
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 90)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is True
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_opening_then_stop(reverse_mode):
+    """Test passive stopped after opening advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 0)
+    )
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 10)
+    )
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, False, 10)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+def test_device_passive_closing_then_stop(reverse_mode):
+    """Test passive stopped after closing advertisement."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 100)
+    )
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 90)
+    )
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, False, 90)
+    )
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+async def test_device_active_not_in_motion(reverse_mode):
+    """Test active not in motion."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, False, 0)
+    )
+
+    basic_info = bytes([0, 0, 0, 0, 0, 0, 100, 0])
+
+    async def custom_implementation():
+        return basic_info
+
+    curtain_device._get_basic_info = Mock(side_effect=custom_implementation)
+
+    await curtain_device.get_basic_info()
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+async def test_device_active_opening(reverse_mode):
+    """Test active opening."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 0)
+    )
+
+    basic_info = bytes([0, 0, 0, 0, 0, 67, 10, 0])
+
+    async def custom_implementation():
+        return basic_info
+
+    curtain_device._get_basic_info = Mock(side_effect=custom_implementation)
+
+    await curtain_device.get_basic_info()
+
+    assert curtain_device.is_opening() is True
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+async def test_device_active_closing(reverse_mode):
+    """Test active closing."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 100)
+    )
+
+    basic_info = bytes([0, 0, 0, 0, 0, 67, 90, 0])
+
+    async def custom_implementation():
+        return basic_info
+
+    curtain_device._get_basic_info = Mock(side_effect=custom_implementation)
+
+    await curtain_device.get_basic_info()
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is True
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+async def test_device_active_opening_then_stop(reverse_mode):
+    """Test active stopped after opening."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 0)
+    )
+
+    basic_info = bytes([0, 0, 0, 0, 0, 67, 10, 0])
+
+    async def custom_implementation():
+        return basic_info
+
+    curtain_device._get_basic_info = Mock(side_effect=custom_implementation)
+
+    await curtain_device.get_basic_info()
+
+    basic_info = bytes([0, 0, 0, 0, 0, 0, 10, 0])
+
+    await curtain_device.get_basic_info()
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("reverse_mode", [(True), (False)])
+async def test_device_active_closing_then_stop(reverse_mode):
+    """Test active stopped after closing."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode)
+    curtain_device.update_from_advertisement(
+        set_advertisement_data(ble_device, True, 100)
+    )
+
+    basic_info = bytes([0, 0, 0, 0, 0, 67, 90, 0])
+
+    async def custom_implementation():
+        return basic_info
+
+    curtain_device._get_basic_info = Mock(side_effect=custom_implementation)
+
+    await curtain_device.get_basic_info()
+
+    basic_info = bytes([0, 0, 0, 0, 0, 0, 90, 0])
+
+    await curtain_device.get_basic_info()
+
+    assert curtain_device.is_opening() is False
+    assert curtain_device.is_closing() is False