J. Nick Koston 3 тижнів тому
батько
коміт
b01eee53e5

+ 3 - 1
switchbot/adv_parsers/plug.py

@@ -2,6 +2,8 @@
 
 from __future__ import annotations
 
+from ..helpers import parse_power_data
+
 
 def process_woplugmini(
     data: bytes | None, mfr_data: bytes | None
@@ -13,5 +15,5 @@ def process_woplugmini(
         "switchMode": True,
         "isOn": mfr_data[7] == 0x80,
         "wifi_rssi": -mfr_data[9],
-        "power": (((mfr_data[10] << 8) + mfr_data[11]) & 0x7FFF) / 10,  # W
+        "power": parse_power_data(mfr_data, 10, 10.0, 0x7FFF),  # W
     }

+ 15 - 9
switchbot/devices/relay_switch.py

@@ -5,6 +5,7 @@ from typing import Any
 from bleak.backends.device import BLEDevice
 
 from ..const import SwitchbotModel
+from ..helpers import parse_power_data, parse_uint24_be
 from ..models import SwitchBotAdvertisement
 from .device import (
     SwitchbotEncryptedDevice,
@@ -14,6 +15,11 @@ from .device import (
 
 _LOGGER = logging.getLogger(__name__)
 
+# Bit masks for status parsing
+SWITCH1_ON_MASK = 0b10000000
+SWITCH2_ON_MASK = 0b01000000
+DOOR_OPEN_MASK = 0b00100000
+
 COMMAND_HEADER = "57"
 COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
 COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
@@ -87,19 +93,19 @@ class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
         """Parse common data from raw bytes."""
         return {
             "sequence_number": raw_data[1],
-            "isOn": bool(raw_data[2] & 0b10000000),
+            "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
             "firmware": raw_data[16] / 10.0,
-            "channel2_isOn": bool(raw_data[2] & 0b01000000),
+            "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
         }
 
     def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
         """Parse user-specific data from raw bytes."""
-        _energy = int.from_bytes(raw_data[1:4], "big") / 60000
-        _energy_usage_yesterday = int.from_bytes(raw_data[4:7], "big") / 60000
-        _use_time = int.from_bytes(raw_data[7:9], "big") / 60.0
-        _voltage = int.from_bytes(raw_data[9:11], "big") / 10.0
-        _current = int.from_bytes(raw_data[11:13], "big") / 1000.0
-        _power = int.from_bytes(raw_data[13:15], "big") / 10.0
+        _energy = parse_uint24_be(raw_data, 1) / 60000
+        _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
+        _use_time = parse_power_data(raw_data, 7, 60.0)
+        _voltage = parse_power_data(raw_data, 9, 10.0)
+        _current = parse_power_data(raw_data, 11, 1000.0)
+        _power = parse_power_data(raw_data, 13, 10.0)
 
         return {
             "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
@@ -178,7 +184,7 @@ class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
         if not common_data["isOn"]:
             self._reset_power_data(user_data)
 
-        garage_door_opener_data = {"door_open": not bool(_data[2] & 0b00100000)}
+        garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
 
         _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
 

+ 53 - 0
switchbot/helpers.py

@@ -1,6 +1,7 @@
 from __future__ import annotations
 
 import asyncio
+import struct
 from collections.abc import Coroutine
 from typing import Any, TypeVar
 
@@ -8,6 +9,10 @@ _R = TypeVar("_R")
 
 _BACKGROUND_TASKS: set[asyncio.Task[Any]] = set()
 
+# Pre-compiled struct unpack methods for better performance
+_UNPACK_UINT16_BE = struct.Struct(">H").unpack_from  # Big-endian unsigned 16-bit
+_UNPACK_UINT24_BE = struct.Struct(">I").unpack  # For 3-byte values (read as 4 bytes)
+
 
 def create_background_task(target: Coroutine[Any, Any, _R]) -> asyncio.Task[_R]:
     """Create a background task."""
@@ -15,3 +20,51 @@ def create_background_task(target: Coroutine[Any, Any, _R]) -> asyncio.Task[_R]:
     _BACKGROUND_TASKS.add(task)
     task.add_done_callback(_BACKGROUND_TASKS.remove)
     return task
+
+
+def parse_power_data(
+    data: bytes, offset: int, scale: float = 1.0, mask: int | None = None
+) -> float:
+    """
+    Parse 2-byte power-related data from bytes.
+
+    Args:
+        data: Raw bytes data
+        offset: Starting offset for the 2-byte value
+        scale: Scale factor to divide the raw value by (default: 1.0)
+        mask: Optional bitmask to apply (default: None)
+
+    Returns:
+        Parsed float value
+
+    """
+    if offset + 2 > len(data):
+        raise ValueError(
+            f"Insufficient data: need at least {offset + 2} bytes, got {len(data)}"
+        )
+
+    value = _UNPACK_UINT16_BE(data, offset)[0]
+    if mask is not None:
+        value &= mask
+    return value / scale
+
+
+def parse_uint24_be(data: bytes, offset: int) -> int:
+    """
+    Parse 3-byte big-endian unsigned integer.
+
+    Args:
+        data: Raw bytes data
+        offset: Starting offset for the 3-byte value
+
+    Returns:
+        Parsed integer value
+
+    """
+    if offset + 3 > len(data):
+        raise ValueError(
+            f"Insufficient data: need at least {offset + 3} bytes, got {len(data)}"
+        )
+
+    # Read 3 bytes and pad with 0 at the beginning for 4-byte struct
+    return _UNPACK_UINT24_BE(b"\x00" + data[offset : offset + 3])[0]

+ 72 - 0
tests/test_helpers.py

@@ -0,0 +1,72 @@
+"""Tests for helper functions."""
+
+import pytest
+
+from switchbot.helpers import parse_power_data
+
+
+def test_parse_power_data_basic():
+    """Test basic power data parsing."""
+    # Test data: bytes with value 0x1234 (4660 decimal) at offset 0
+    data = b"\x12\x34\x56\x78"
+
+    # Test without scale (should return raw value)
+    assert parse_power_data(data, 0) == 4660
+
+    # Test with scale of 10
+    assert parse_power_data(data, 0, 10.0) == 466.0
+
+    # Test with scale of 100
+    assert parse_power_data(data, 0, 100.0) == 46.6
+
+
+def test_parse_power_data_with_offset():
+    """Test power data parsing with different offsets."""
+    data = b"\x00\x00\x12\x34\x56\x78"
+
+    # Value at offset 2 should be 0x1234
+    assert parse_power_data(data, 2, 10.0) == 466.0
+
+    # Value at offset 4 should be 0x5678
+    assert parse_power_data(data, 4, 10.0) == 2213.6
+
+
+def test_parse_power_data_with_mask():
+    """Test power data parsing with bitmask."""
+    # Test data: 0xFFFF
+    data = b"\xff\xff"
+
+    # Without mask
+    assert parse_power_data(data, 0, 10.0) == 6553.5
+
+    # With mask 0x7FFF (clear highest bit)
+    assert parse_power_data(data, 0, 10.0, 0x7FFF) == 3276.7
+
+
+def test_parse_power_data_insufficient_data():
+    """Test error handling for insufficient data."""
+    data = b"\x12"  # Only 1 byte
+
+    # Should raise ValueError when trying to read 2 bytes
+    with pytest.raises(ValueError, match="Insufficient data"):
+        parse_power_data(data, 0)
+
+    # Should also fail at offset 1 with 2-byte data
+    data = b"\x12\x34"
+    with pytest.raises(ValueError, match="Insufficient data"):
+        parse_power_data(data, 1)  # Would need to read bytes 1-2
+
+
+def test_parse_power_data_real_world_examples():
+    """Test with real-world examples from relay switch."""
+    # Simulate relay switch data structure
+    raw_data = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdc\x00\x0f\x00\xe8"
+
+    # Voltage at offset 9-10: 0x00DC = 220 / 10.0 = 22.0V
+    assert parse_power_data(raw_data, 9, 10.0) == 22.0
+
+    # Current at offset 11-12: 0x000F = 15 / 1000.0 = 0.015A
+    assert parse_power_data(raw_data, 11, 1000.0) == 0.015
+
+    # Power at offset 13-14: 0x00E8 = 232 / 10.0 = 23.2W
+    assert parse_power_data(raw_data, 13, 10.0) == 23.2