Forráskód Böngészése

fix switch on off status update

zerzhang 1 hónapja
szülő
commit
b0ee23d05a

+ 1 - 2
switchbot/adv_parser.py

@@ -31,7 +31,6 @@ from .adv_parsers.motion import process_wopresence
 from .adv_parsers.plug import process_woplugmini
 from .adv_parsers.relay_switch import (
     process_garage_door_opener,
-    process_relay_switch_1pm,
     process_relay_switch_2pm,
     process_relay_switch_common_data,
 )
@@ -209,7 +208,7 @@ SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
     "<": {
         "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
         "modelFriendlyName": "Relay Switch 1PM",
-        "func": process_relay_switch_1pm,
+        "func": process_relay_switch_common_data,
         "manufacturer_id": 2409,
     },
     ";": {

+ 1 - 20
switchbot/adv_parsers/relay_switch.py

@@ -2,17 +2,11 @@
 
 from __future__ import annotations
 
-import struct
 from typing import Any
 
 
-def parse_power_data(mfr_data: bytes, start: int, end: int) -> int:
-    """Helper to parse power data from manufacturer data."""
-    return struct.unpack(">H", mfr_data[start:end])[0] / 10.0
-
-
 def process_relay_switch_common_data(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
-    """Process relay switch common data."""
+    """Process relay switch 1 and 1PM common data."""
     if mfr_data is None:
         return {}
     return {
@@ -21,16 +15,6 @@ def process_relay_switch_common_data(data: bytes | None, mfr_data: bytes | None)
         "isOn": bool(mfr_data[7] & 0b10000000),
     }
 
-
-def process_relay_switch_1pm(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
-    """Process Relay Switch 1PM services data."""
-    if mfr_data is None:
-        return {}
-    common_data = process_relay_switch_common_data(data, mfr_data)
-    common_data["power"] = parse_power_data(mfr_data, 10, 12)
-    return common_data
-
-
 def process_garage_door_opener(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
     """Process garage door opener services data."""
     if mfr_data is None:
@@ -39,7 +23,6 @@ def process_garage_door_opener(data: bytes | None, mfr_data: bytes | None) -> di
     common_data["door_open"] = not bool(mfr_data[7] & 0b00100000)
     return common_data
 
-
 def process_relay_switch_2pm(data: bytes | None, mfr_data: bytes | None) -> dict[int, dict[str, Any]]:
     """Process Relay Switch 2PM services data."""
     if mfr_data is None:
@@ -48,13 +31,11 @@ def process_relay_switch_2pm(data: bytes | None, mfr_data: bytes | None) -> dict
     return {
         1: {
             **process_relay_switch_common_data(data, mfr_data),
-            "power": parse_power_data(mfr_data, 10, 12),
         },
         2: {
             "switchMode": True,  # for compatibility, useless
             "sequence_number": mfr_data[6],
             "isOn": bool(mfr_data[7] & 0b01000000),
-            "power": parse_power_data(mfr_data, 12, 14),
         },
     }
 

+ 1 - 1
switchbot/const/__init__.py

@@ -62,7 +62,7 @@ class SwitchbotModel(StrEnum):
     LEAK = "Leak Detector"
     KEYPAD = "WoKeypad"
     RELAY_SWITCH_1PM = "Relay Switch 1PM"
-    RELAY_SWITCH_1 = "Relay Switch 1"
+    RELAY_SWITCH_1 = "Relay Switch 1 Plus"
     REMOTE = "WoRemote"
     EVAPORATIVE_HUMIDIFIER = "Evaporative Humidifier"
     ROLLER_SHADE = "Roller Shade"

+ 57 - 40
switchbot/devices/relay_switch.py

@@ -78,19 +78,50 @@ class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
             device, key_id, encryption_key, model, **kwargs
         )
 
+    def _reset_power_data(self, data: dict[str, Any]) -> None:
+        """Reset power-related data to 0."""
+        for key in ["power", "current", "voltage"]:
+            data[key] = 0
+
+    def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
+        """Parse common data from raw bytes."""
+        return {
+            "sequence_number": raw_data[1],
+            "isOn": bool(raw_data[2] & 0b10000000),
+            "firmware": raw_data[16] / 10.0,
+            "channel2_isOn": bool(raw_data[2] & 0b01000000),
+        }
+
+    def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
+        """Parse user-specific data from raw bytes."""
+        return {
+            "Electricity": round(int.from_bytes(raw_data[1:4], "big") / 60000, 2),
+            "Electricity Usage Yesterday": round(int.from_bytes(raw_data[4:7], "big") / 60000, 2),
+            "use_time": round(int.from_bytes(raw_data[7:9], "big") / 60, 2),
+            "voltage": int.from_bytes(raw_data[9:11], "big") / 10.0,
+            "current": int.from_bytes(raw_data[11:13], "big"),
+            "power": int.from_bytes(raw_data[13:15], "big") / 10.0,
+        }
+
     def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
         """Update device data from advertisement."""
         adv_data = advertisement.data["data"]
         channel = self._channel if hasattr(self, "_channel") else None
 
-        if channel is None:
-            adv_data["voltage"] = self._get_adv_value("voltage") or 0
-            adv_data["current"] = self._get_adv_value("current") or 0
-        else:
-            for i in range(1, channel + 1):
-                adv_data[i] = adv_data.get(i, {})
-                adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
-                adv_data[i]["current"] = self._get_adv_value("current", i) or 0
+        if self._model in (
+            SwitchbotModel.RELAY_SWITCH_1PM,
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ):
+            if channel is None:
+                adv_data["voltage"] = self._get_adv_value("voltage") or 0
+                adv_data["current"] = self._get_adv_value("current") or 0
+                adv_data["power"] = self._get_adv_value("power") or 0
+            else:
+                for i in range(1, channel + 1):
+                    adv_data[i] = adv_data.get(i, {})
+                    adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
+                    adv_data[i]["current"] = self._get_adv_value("current", i) or 0
+                    adv_data[i]["power"] = self._get_adv_value("power", i) or 0
         super().update_from_advertisement(advertisement)
 
 
@@ -112,29 +143,22 @@ class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
         if not (_channel1_data := await self._get_basic_info(COMMAND_GET_CHANNEL1_INFO.format(current_time_hex, current_day_start_time_hex))):
             return None
 
-        common_data = {
-            "isOn": bool(_data[2] & 0b10000000),
-            "firmware": _data[16] / 10.0,
-            "use_time": int.from_bytes(_channel1_data[7:9], "big"),
+        _LOGGER.debug("on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex())
 
-        }
+        common_data = self._parse_common_data(_data)
+        user_data = self._parse_user_data(_channel1_data)
 
-        user_data = {
-            "Electricity Usage Today": int.from_bytes(_channel1_data[1:4], "big"),
-            "Electricity Usage Yesterday": int.from_bytes(_channel1_data[4:7], "big"),
-            "voltage": int.from_bytes(_channel1_data[9:11], "big") / 10.0,
-            "current": int.from_bytes(_channel1_data[11:13], "big"),
-            "power": int.from_bytes(_channel1_data[13:15], "big") / 10.0,
-        }
+        if self._model in (SwitchbotModel.RELAY_SWITCH_1, SwitchbotModel.GARAGE_DOOR_OPENER):
+            for key in ["voltage", "current", "power"]:
+                user_data.pop(key, None)
 
-        garage_door_opener_data = {
-            "door_open": not bool(_data[7] & 0b00100000),
-        }
+        if not common_data["isOn"]:
+            self._reset_power_data(user_data)
 
-        _LOGGER.debug("common_data: %s, garage_door_opener_data: %s", common_data, garage_door_opener_data)
+        garage_door_opener_data = {"door_open": not bool(_data[7] & 0b00100000)}
+
+        _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
 
-        if self._model == SwitchbotModel.RELAY_SWITCH_1:
-            return common_data
         if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
             return common_data | garage_door_opener_data
         return common_data | user_data
@@ -195,23 +219,16 @@ class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
             ):
             return None
 
+        _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
 
-        result = {
-            1: common_data,
-            2: {
-                "isOn": bool(_channel2_data[2] & 0b01000000),
-                "Electricity Usage Today": int.from_bytes(_channel2_data[1:4], "big"),
-                "Electricity Usage Yesterday": int.from_bytes(_channel2_data[4:7], "big"),
-                "use_time": int.from_bytes(_channel2_data[7:9], "big"),
-                "voltage": int.from_bytes(_channel2_data[9:11], "big") / 10.0,
-                "current": int.from_bytes(_channel2_data[11:13], "big"),
-                "power": int.from_bytes(_channel2_data[13:15], "big") / 10.0,
-            }
-        }
+        channel2_data = self._parse_user_data(_channel2_data)
+        channel2_data["isOn"] = common_data["channel2_isOn"]
 
-        _LOGGER.debug("Multi channel basic info: %s", result)
+        if not channel2_data["isOn"]:
+            self._reset_power_data(channel2_data)
 
-        return result
+        _LOGGER.debug("channel1_data: %s, channel2_data: %s", common_data, channel2_data)
+        return {1: common_data, 2: channel2_data}
 
     @update_after_operation
     async def turn_on(self, channel: int) -> bool:

+ 0 - 6
tests/test_adv_parser.py

@@ -3033,13 +3033,11 @@ def test_blind_tilt_with_empty_data() -> None:
             {
                 1: {
                     'isOn': True,
-                    'power': 0.0,
                     'sequence_number': 138,
                     'switchMode': True,
                 },
                 2: {
                     'isOn': True,
-                    'power': 7.0,
                     'sequence_number': 138,
                     'switchMode': True,
                 },
@@ -3065,7 +3063,6 @@ def test_blind_tilt_with_empty_data() -> None:
             b"<\x00\x00\x00",
             {
                 'isOn': True,
-                'power': 4.9,
                 'sequence_number': 71,
                 'switchMode': True,
             },
@@ -3123,13 +3120,11 @@ def test_adv_active(test_case: AdvTestCase) -> None:
             {
                 1: {
                     'isOn': True,
-                    'power': 0.0,
                     'sequence_number': 138,
                     'switchMode': True,
                 },
                 2: {
                     'isOn': True,
-                    'power': 7.0,
                     'sequence_number': 138,
                     'switchMode': True,
                 },
@@ -3155,7 +3150,6 @@ def test_adv_active(test_case: AdvTestCase) -> None:
             b"<\x00\x00\x00",
             {
                 'isOn': True,
-                'power': 4.9,
                 'sequence_number': 71,
                 'switchMode': True,
             },

+ 154 - 47
tests/test_relay_switch.py

@@ -1,4 +1,4 @@
-from unittest.mock import AsyncMock
+from unittest.mock import AsyncMock, MagicMock
 
 import pytest
 from bleak.backends.device import BLEDevice
@@ -9,65 +9,172 @@ from switchbot.devices import relay_switch
 from .test_adv_parser import generate_ble_device
 
 
-def create_device_for_command_testing(calibration=True, reverse_mode=False):
+def create_device_for_command_testing(
+    rawAdvData: bytes, model: str, init_data: dict | None = None):
+    """Create a device for command testing."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    relay_switch_device = relay_switch.SwitchbotRelaySwitch(
+    device_class = (
+        relay_switch.SwitchbotRelaySwitch2PM
+        if model == SwitchbotModel.RELAY_SWITCH_2PM
+        else relay_switch.SwitchbotRelaySwitch
+    )
+    device = device_class(
         ble_device, "ff", "ffffffffffffffffffffffffffffffff"
     )
-    relay_switch_device.update_from_advertisement(make_advertisement_data(ble_device))
-    return relay_switch_device
-
+    device.update_from_advertisement(
+        make_advertisement_data(ble_device, rawAdvData, model, init_data)
+    )
+    device._send_command = AsyncMock()
+    device._check_command_result = MagicMock()
+    device.update = AsyncMock()
+    return device
 
-def make_advertisement_data(ble_device: BLEDevice):
+def make_advertisement_data(
+    ble_device: BLEDevice, rawAdvData: bytes, model: str, init_data: dict | None = None
+):
     """Set advertisement data with defaults."""
-    return SwitchBotAdvertisement(
-        address="aa:bb:cc:dd:ee:ff",
-        data={
-            "rawAdvData": b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
-            "data": {
-                "switchMode": True,
-                "sequence_number": 71,
-                "isOn": True,
-                "power": 4.9,
-                "voltage": 0,
-                "current": 0,
+    if init_data is None:
+        init_data = {}
+    if model == SwitchbotModel.RELAY_SWITCH_2PM:
+        return SwitchBotAdvertisement(
+            address="aa:bb:cc:dd:ee:ff",
+            data={
+                "rawAdvData": rawAdvData,
+                "data": {
+                    1: {
+                        "switchMode": True,
+                        "sequence_number": 99,
+                        "isOn": True,
+                    },
+                    2: {
+                        "switchMode": True,
+                        "sequence_number": 99,
+                        "isOn": False,
+                    },
+                }
+                | init_data,
+                "isEncrypted": False,
+                "model": model,
+                "modelFriendlyName": "Relay Switch 2PM",
+                "modelName": SwitchbotModel.RELAY_SWITCH_2PM,
             },
-            "isEncrypted": False,
-            "model": "<",
-            "modelFriendlyName": "Relay Switch 1PM",
-            "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
-        },
-        device=ble_device,
-        rssi=-80,
-        active=True,
+            device=ble_device,
+            rssi=-80,
+            active=True,
+        )
+    return None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("rawAdvData", "model", "init_data"),
+    [
+        (
+            b"\x00\x00\x00\x00\x00\x00",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+            {1: {"isOn": True}, 2: {"isOn": True}},
+        ),
+    ],
+)
+async def test_turn_on_2PM(rawAdvData, model, init_data):
+    """Test turn on command."""
+    device = create_device_for_command_testing(rawAdvData, model, init_data)
+    await device.turn_on(1)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[model][1]
     )
+    assert device.is_on(1) is True
 
+    await device.turn_on(2)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[model][2]
+    )
+    assert device.is_on(2) is True
 
 @pytest.mark.asyncio
-async def test_turn_on():
-    relay_switch_device = create_device_for_command_testing()
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01")
-    await relay_switch_device.turn_on()
-    assert relay_switch_device.is_on() is True
+@pytest.mark.parametrize(
+    ("rawAdvData", "model", "init_data"),
+    [
+        (
+            b"\x00\x00\x00\x00\x00\x00",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+            {1: {"isOn": False}, 2: {"isOn": False}},
+        ),
+    ],
+)
+async def test_turn_off_2PM(rawAdvData, model, init_data):
+    """Test turn off command."""
+    device = create_device_for_command_testing(rawAdvData, model, init_data)
+    await device.turn_off(1)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[model][1]
+    )
+    assert device.is_on(1) is False
 
+    await device.turn_off(2)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[model][2]
+    )
+    assert device.is_on(2) is False
 
 @pytest.mark.asyncio
-async def test_turn_off():
-    relay_switch_device = create_device_for_command_testing()
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01")
-    await relay_switch_device.turn_off()
-    assert relay_switch_device.is_on() is False
+@pytest.mark.parametrize(
+    ("rawAdvData", "model"),
+    [
+        (
+            b"\x00\x00\x00\x00\x00\x00",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ),
+    ],
+)
+async def test_turn_toggle_2PM(rawAdvData, model):
+    """Test toggle command."""
+    device = create_device_for_command_testing(rawAdvData, model)
+    await device.async_toggle(1)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[model][1]
+    )
+    assert device.is_on(1) is True
+
+    await device.async_toggle(2)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[model][2]
+    )
+    assert device.is_on(2) is False
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("rawAdvData", "model"),
+    [
+        (
+            b"\x00\x00\x00\x00\x00\x00",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ),
+    ],
+)
+async def test_get_switch_mode_2PM(rawAdvData, model):
+    """Test get switch mode."""
+    device = create_device_for_command_testing(rawAdvData, model)
+    assert device.switch_mode(1) is True
+    assert device.switch_mode(2) is True
 
 
 @pytest.mark.asyncio
-async def test_get_basic_info():
-    relay_switch_device = create_device_for_command_testing()
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01\x01")
-    info = await relay_switch_device.get_basic_info()
-    assert info["is_on"] is True
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01\x00")
-    info = await relay_switch_device.get_basic_info()
-    assert info["is_on"] is False
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x00\x00")
-    info = await relay_switch_device.get_basic_info()
-    assert info is None
+@pytest.mark.parametrize(
+    ("rawAdvData", "model"),
+    [
+        (
+            b"\x00\x00\x00\x00\x00\x00",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ),
+    ],
+)
+@pytest.mark.parametrize(
+    ("channel1_info", "channel2_info"), [(True, False), (False, True), (False, False)]
+)
+async def test_get_basic_info_returns_none(rawAdvData, model, channel1_info, channel2_info):
+    device = create_device_for_command_testing(rawAdvData, model)
+
+    device.get_current_time_and_start_time = MagicMock(return_value=("683074d6", "682fba80"))
+
+