ソースを参照

Add support for relay switch series (#347)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Retha Runolfsson 5 日 前
コミット
8e38b6ad0d

+ 2 - 1
switchbot/__init__.py

@@ -35,7 +35,7 @@ from .devices.humidifier import SwitchbotHumidifier
 from .devices.light_strip import SwitchbotLightStrip
 from .devices.lock import SwitchbotLock
 from .devices.plug import SwitchbotPlugMini
-from .devices.relay_switch import SwitchbotRelaySwitch
+from .devices.relay_switch import SwitchbotRelaySwitch, SwitchbotRelaySwitch2PM
 from .devices.roller_shade import SwitchbotRollerShade
 from .devices.vacuum import SwitchbotVacuum
 from .discovery import GetSwitchbotDevices
@@ -74,6 +74,7 @@ __all__ = [
     "SwitchbotPlugMini",
     "SwitchbotPlugMini",
     "SwitchbotRelaySwitch",
+    "SwitchbotRelaySwitch2PM",
     "SwitchbotRollerShade",
     "SwitchbotSupportedType",
     "SwitchbotSupportedType",

+ 17 - 4
switchbot/adv_parser.py

@@ -30,8 +30,9 @@ from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
 from .adv_parsers.motion import process_wopresence
 from .adv_parsers.plug import process_woplugmini
 from .adv_parsers.relay_switch import (
-    process_worelay_switch_1,
-    process_worelay_switch_1pm,
+    process_garage_door_opener,
+    process_relay_switch_2pm,
+    process_relay_switch_common_data,
 )
 from .adv_parsers.remote import process_woremote
 from .adv_parsers.roller_shade import process_worollershade
@@ -207,13 +208,13 @@ SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
     "<": {
         "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
         "modelFriendlyName": "Relay Switch 1PM",
-        "func": process_worelay_switch_1pm,
+        "func": process_relay_switch_common_data,
         "manufacturer_id": 2409,
     },
     ";": {
         "modelName": SwitchbotModel.RELAY_SWITCH_1,
         "modelFriendlyName": "Relay Switch 1",
-        "func": process_worelay_switch_1,
+        "func": process_relay_switch_common_data,
         "manufacturer_id": 2409,
     },
     "b": {
@@ -312,6 +313,18 @@ SUPPORTED_TYPES: dict[str | bytes, SwitchbotSupportedType] = {
         "func": process_lock2,
         "manufacturer_id": 2409,
     },
+    ">": {
+        "modelName": SwitchbotModel.GARAGE_DOOR_OPENER,
+        "modelFriendlyName": "Garage Door Opener",
+        "func": process_garage_door_opener,
+        "manufacturer_id": 2409,
+    },
+    "=": {
+        "modelName": SwitchbotModel.RELAY_SWITCH_2PM,
+        "modelFriendlyName": "Relay Switch 2PM",
+        "func": process_relay_switch_2pm,
+        "manufacturer_id": 2409,
+    },
 }
 
 _SWITCHBOT_MODEL_TO_CHAR = {

+ 3 - 1
switchbot/adv_parsers/plug.py

@@ -2,6 +2,8 @@
 
 from __future__ import annotations
 
+from ..helpers import parse_power_data
+
 
 def process_woplugmini(
     data: bytes | None, mfr_data: bytes | None
@@ -13,5 +15,5 @@ def process_woplugmini(
         "switchMode": True,
         "isOn": mfr_data[7] == 0x80,
         "wifi_rssi": -mfr_data[9],
-        "power": (((mfr_data[10] << 8) + mfr_data[11]) & 0x7FFF) / 10,  # W
+        "power": parse_power_data(mfr_data, 10, 10.0, 0x7FFF),  # W
     }

+ 28 - 12
switchbot/adv_parsers/relay_switch.py

@@ -2,31 +2,47 @@
 
 from __future__ import annotations
 
+from typing import Any
 
-def process_worelay_switch_1pm(
+
+def process_relay_switch_common_data(
     data: bytes | None, mfr_data: bytes | None
-) -> dict[str, bool | int]:
-    """Process WoStrip services data."""
+) -> dict[str, Any]:
+    """Process relay switch 1 and 1PM common data."""
     if mfr_data is None:
         return {}
     return {
         "switchMode": True,  # for compatibility, useless
         "sequence_number": mfr_data[6],
         "isOn": bool(mfr_data[7] & 0b10000000),
-        "power": ((mfr_data[10] << 8) + mfr_data[11]) / 10,
-        "voltage": 0,
-        "current": 0,
     }
 
 
-def process_worelay_switch_1(
+def process_garage_door_opener(
+    data: bytes | None, mfr_data: bytes | None
+) -> dict[str, Any]:
+    """Process garage door opener services data."""
+    if mfr_data is None:
+        return {}
+    common_data = process_relay_switch_common_data(data, mfr_data)
+    common_data["door_open"] = not bool(mfr_data[7] & 0b00100000)
+    return common_data
+
+
+def process_relay_switch_2pm(
     data: bytes | None, mfr_data: bytes | None
-) -> dict[str, bool | int]:
-    """Process WoStrip services data."""
+) -> dict[int, dict[str, Any]]:
+    """Process Relay Switch 2PM services data."""
     if mfr_data is None:
         return {}
+
     return {
-        "switchMode": True,  # for compatibility, useless
-        "sequence_number": mfr_data[6],
-        "isOn": bool(mfr_data[7] & 0b10000000),
+        1: {
+            **process_relay_switch_common_data(data, mfr_data),
+        },
+        2: {
+            "switchMode": True,  # for compatibility, useless
+            "sequence_number": mfr_data[6],
+            "isOn": bool(mfr_data[7] & 0b01000000),
+        },
     }

+ 2 - 0
switchbot/const/__init__.py

@@ -83,6 +83,8 @@ class SwitchbotModel(StrEnum):
     HUB3 = "Hub3"
     LOCK_ULTRA = "Lock Ultra"
     LOCK_LITE = "Lock Lite"
+    GARAGE_DOOR_OPENER = "Garage Door Opener"
+    RELAY_SWITCH_2PM = "Relay Switch 2PM"
 
 
 __all__ = [

+ 6 - 2
switchbot/devices/device.py

@@ -116,7 +116,9 @@ def _merge_data(old_data: dict[str, Any], new_data: dict[str, Any]) -> dict[str,
     """Merge data but only add None keys if they are missing."""
     merged = old_data.copy()
     for key, value in new_data.items():
-        if value is not None or key not in old_data:
+        if isinstance(value, dict) and isinstance(old_data.get(key), dict):
+            merged[key] = _merge_data(old_data[key], value)
+        elif value is not None or key not in old_data:
             merged[key] = value
     return merged
 
@@ -538,7 +540,7 @@ class SwitchbotBaseDevice:
         self._override_adv_data.update(state)
         self._update_parsed_data(state)
 
-    def _get_adv_value(self, key: str) -> Any:
+    def _get_adv_value(self, key: str, channel: int | None = None) -> Any:
         """Return value from advertisement data."""
         if self._override_adv_data and key in self._override_adv_data:
             _LOGGER.debug(
@@ -550,6 +552,8 @@ class SwitchbotBaseDevice:
             return self._override_adv_data[key]
         if not self._sb_adv_data:
             return None
+        if channel is not None:
+            return self._sb_adv_data.data["data"].get(channel, {}).get(key)
         return self._sb_adv_data.data["data"].get(key)
 
     def get_battery_percent(self) -> Any:

+ 231 - 67
switchbot/devices/relay_switch.py

@@ -5,21 +5,59 @@ from typing import Any
 from bleak.backends.device import BLEDevice
 
 from ..const import SwitchbotModel
+from ..helpers import parse_power_data, parse_uint24_be
 from ..models import SwitchBotAdvertisement
-from .device import SwitchbotEncryptedDevice
+from .device import (
+    SwitchbotEncryptedDevice,
+    SwitchbotSequenceDevice,
+    update_after_operation,
+)
 
 _LOGGER = logging.getLogger(__name__)
 
+# Bit masks for status parsing
+SWITCH1_ON_MASK = 0b10000000
+SWITCH2_ON_MASK = 0b01000000
+DOOR_OPEN_MASK = 0b00100000
+
 COMMAND_HEADER = "57"
 COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
 COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
 COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
 COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
-COMMAND_GET_SWITCH_STATE = f"{COMMAND_HEADER}0f7101000000"
-PASSIVE_POLL_INTERVAL = 10 * 60
+
+COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
+COMMAND_GET_CHANNEL1_INFO = f"{COMMAND_HEADER}0f710600{{}}{{}}"
+COMMAND_GET_CHANNEL2_INFO = f"{COMMAND_HEADER}0f710601{{}}{{}}"
+
+
+MULTI_CHANNEL_COMMANDS_TURN_ON = {
+    SwitchbotModel.RELAY_SWITCH_2PM: {
+        1: "570f70010d00",
+        2: "570f70010700",
+    }
+}
+MULTI_CHANNEL_COMMANDS_TURN_OFF = {
+    SwitchbotModel.RELAY_SWITCH_2PM: {
+        1: "570f70010c00",
+        2: "570f70010300",
+    }
+}
+MULTI_CHANNEL_COMMANDS_TOGGLE = {
+    SwitchbotModel.RELAY_SWITCH_2PM: {
+        1: "570f70010e00",
+        2: "570f70010b00",
+    }
+}
+MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
+    SwitchbotModel.RELAY_SWITCH_2PM: {
+        1: COMMAND_GET_CHANNEL1_INFO,
+        2: COMMAND_GET_CHANNEL2_INFO,
+    }
+}
 
 
-class SwitchbotRelaySwitch(SwitchbotEncryptedDevice):
+class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
     """Representation of a Switchbot relay switch 1pm."""
 
     def __init__(
@@ -31,7 +69,6 @@ class SwitchbotRelaySwitch(SwitchbotEncryptedDevice):
         model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_1PM,
         **kwargs: Any,
     ) -> None:
-        self._force_next_update = False
         super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
 
     @classmethod
@@ -47,85 +84,127 @@ class SwitchbotRelaySwitch(SwitchbotEncryptedDevice):
             device, key_id, encryption_key, model, **kwargs
         )
 
+    def _reset_power_data(self, data: dict[str, Any]) -> None:
+        """Reset power-related data to 0."""
+        for key in ["power", "current", "voltage"]:
+            data[key] = 0
+
+    def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
+        """Parse common data from raw bytes."""
+        return {
+            "sequence_number": raw_data[1],
+            "isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
+            "firmware": raw_data[16] / 10.0,
+            "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
+        }
+
+    def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
+        """Parse user-specific data from raw bytes."""
+        _energy = parse_uint24_be(raw_data, 1) / 60000
+        _energy_usage_yesterday = parse_uint24_be(raw_data, 4) / 60000
+        _use_time = parse_power_data(raw_data, 7, 60.0)
+        _voltage = parse_power_data(raw_data, 9, 10.0)
+        _current = parse_power_data(raw_data, 11, 1000.0)
+        _power = parse_power_data(raw_data, 13, 10.0)
+
+        return {
+            "energy": 0.01 if 0 < _energy <= 0.01 else round(_energy, 2),
+            "energy usage yesterday": 0.01
+            if 0 < _energy_usage_yesterday <= 0.01
+            else round(_energy_usage_yesterday, 2),
+            "use_time": round(_use_time, 1),
+            "voltage": 0.1 if 0 < _voltage <= 0.1 else round(_voltage),
+            "current": 0.1 if 0 < _current <= 0.1 else round(_current, 1),
+            "power": 0.1 if 0 < _power <= 0.1 else round(_power, 1),
+        }
+
     def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
         """Update device data from advertisement."""
-        # Obtain voltage and current through command.
         adv_data = advertisement.data["data"]
-        if previous_voltage := self._get_adv_value("voltage"):
-            adv_data["voltage"] = previous_voltage
-        if previous_current := self._get_adv_value("current"):
-            adv_data["current"] = previous_current
-        current_state = self._get_adv_value("sequence_number")
+        channel = self._channel if hasattr(self, "_channel") else None
+
+        if self._model in (
+            SwitchbotModel.RELAY_SWITCH_1PM,
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ):
+            if channel is None:
+                adv_data["voltage"] = self._get_adv_value("voltage") or 0
+                adv_data["current"] = self._get_adv_value("current") or 0
+                adv_data["power"] = self._get_adv_value("power") or 0
+                adv_data["energy"] = self._get_adv_value("energy") or 0
+            else:
+                for i in range(1, channel + 1):
+                    adv_data[i] = adv_data.get(i, {})
+                    adv_data[i]["voltage"] = self._get_adv_value("voltage", i) or 0
+                    adv_data[i]["current"] = self._get_adv_value("current", i) or 0
+                    adv_data[i]["power"] = self._get_adv_value("power", i) or 0
+                    adv_data[i]["energy"] = self._get_adv_value("energy", i) or 0
         super().update_from_advertisement(advertisement)
-        new_state = self._get_adv_value("sequence_number")
+
+    def get_current_time_and_start_time(self) -> int:
+        """Get current time in seconds since epoch."""
+        current_time = int(time.time())
+        current_time_hex = f"{current_time:08x}"
+        current_day_start_time = int(current_time / 86400) * 86400
+        current_day_start_time_hex = f"{current_day_start_time:08x}"
+
+        return current_time_hex, current_day_start_time_hex
+
+    async def get_basic_info(self) -> dict[str, Any] | None:
+        """Get device basic settings."""
+        current_time_hex, current_day_start_time_hex = (
+            self.get_current_time_and_start_time()
+        )
+
+        if not (_data := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
+            return None
+        if not (
+            _channel1_data := await self._get_basic_info(
+                COMMAND_GET_CHANNEL1_INFO.format(
+                    current_time_hex, current_day_start_time_hex
+                )
+            )
+        ):
+            return None
+
         _LOGGER.debug(
-            "%s: update advertisement: %s (seq before: %s) (seq after: %s)",
-            self.name,
-            advertisement,
-            current_state,
-            new_state,
+            "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
         )
-        if current_state != new_state:
-            self._force_next_update = True
-
-    async def update(self, interface: int | None = None) -> None:
-        """Update state of device."""
-        if info := await self.get_voltage_and_current():
-            self._last_full_update = time.monotonic()
-            self._update_parsed_data(info)
-            self._fire_callbacks()
-
-    async def get_voltage_and_current(self) -> dict[str, Any] | None:
-        """Get voltage and current because advtisement don't have these"""
-        result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
-        ok = self._check_command_result(result, 0, {1})
-        if ok:
-            return {
-                "voltage": ((result[9] << 8) + result[10]) / 10,
-                "current": (result[11] << 8) + result[12],
-            }
-        return None
 
-    async def get_basic_info(self) -> dict[str, Any] | None:
-        """Get the current state of the switch."""
-        result = await self._send_command(COMMAND_GET_SWITCH_STATE)
-        if self._check_command_result(result, 0, {1}):
-            return {
-                "is_on": result[1] & 0x01 != 0,
-            }
-        return None
-
-    def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
-        """Return if device needs polling."""
-        if self._force_next_update:
-            self._force_next_update = False
-            return True
-        if (
-            seconds_since_last_poll is not None
-            and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
+        common_data = self._parse_common_data(_data)
+        user_data = self._parse_user_data(_channel1_data)
+
+        if self._model in (
+            SwitchbotModel.RELAY_SWITCH_1,
+            SwitchbotModel.GARAGE_DOOR_OPENER,
         ):
-            return False
-        time_since_last_full_update = time.monotonic() - self._last_full_update
-        return not time_since_last_full_update < PASSIVE_POLL_INTERVAL
+            for key in ["voltage", "current", "power", "energy"]:
+                user_data.pop(key, None)
+
+        if not common_data["isOn"]:
+            self._reset_power_data(user_data)
+
+        garage_door_opener_data = {"door_open": not bool(_data[2] & DOOR_OPEN_MASK)}
+
+        _LOGGER.debug("common_data: %s, user_data: %s", common_data, user_data)
+
+        if self._model == SwitchbotModel.GARAGE_DOOR_OPENER:
+            return common_data | garage_door_opener_data
+        return common_data | user_data
 
+    @update_after_operation
     async def turn_on(self) -> bool:
         """Turn device on."""
         result = await self._send_command(COMMAND_TURN_ON)
-        ok = self._check_command_result(result, 0, {1})
-        if ok:
-            self._override_state({"isOn": True})
-            self._fire_callbacks()
-        return ok
+        return self._check_command_result(result, 0, {1})
 
+    @update_after_operation
     async def turn_off(self) -> bool:
         """Turn device off."""
         result = await self._send_command(COMMAND_TURN_OFF)
-        ok = self._check_command_result(result, 0, {1})
-        if ok:
-            self._override_state({"isOn": False})
-            self._fire_callbacks()
-        return ok
+        return self._check_command_result(result, 0, {1})
 
+    @update_after_operation
     async def async_toggle(self, **kwargs) -> bool:
         """Toggle device."""
         result = await self._send_command(COMMAND_TOGGLE)
@@ -134,3 +213,88 @@ class SwitchbotRelaySwitch(SwitchbotEncryptedDevice):
     def is_on(self) -> bool | None:
         """Return switch state from cache."""
         return self._get_adv_value("isOn")
+
+
+class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
+    """Representation of a Switchbot relay switch 2pm."""
+
+    def __init__(
+        self,
+        device: BLEDevice,
+        key_id: str,
+        encryption_key: str,
+        interface: int = 0,
+        model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
+        self._channel = 2
+
+    @property
+    def channel(self) -> int:
+        return self._channel
+
+    def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
+        """Return parsed device data, optionally for a specific channel."""
+        data = self.data.get("data") or {}
+        return data.get(channel, {})
+
+    async def get_basic_info(self):
+        current_time_hex, current_day_start_time_hex = (
+            self.get_current_time_and_start_time()
+        )
+        if not (common_data := await super().get_basic_info()):
+            return None
+        if not (
+            _channel2_data := await self._get_basic_info(
+                COMMAND_GET_CHANNEL2_INFO.format(
+                    current_time_hex, current_day_start_time_hex
+                )
+            )
+        ):
+            return None
+
+        _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
+
+        channel2_data = self._parse_user_data(_channel2_data)
+        channel2_data["isOn"] = common_data["channel2_isOn"]
+
+        if not channel2_data["isOn"]:
+            self._reset_power_data(channel2_data)
+
+        _LOGGER.debug(
+            "channel1_data: %s, channel2_data: %s", common_data, channel2_data
+        )
+        return {1: common_data, 2: channel2_data}
+
+    @update_after_operation
+    async def turn_on(self, channel: int) -> bool:
+        """Turn device on."""
+        result = await self._send_command(
+            MULTI_CHANNEL_COMMANDS_TURN_ON[self._model][channel]
+        )
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
+    async def turn_off(self, channel: int) -> bool:
+        """Turn device off."""
+        result = await self._send_command(
+            MULTI_CHANNEL_COMMANDS_TURN_OFF[self._model][channel]
+        )
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
+    async def async_toggle(self, channel: int) -> bool:
+        """Toggle device."""
+        result = await self._send_command(
+            MULTI_CHANNEL_COMMANDS_TOGGLE[self._model][channel]
+        )
+        return self._check_command_result(result, 0, {1})
+
+    def is_on(self, channel: int) -> bool | None:
+        """Return switch state from cache."""
+        return self._get_adv_value("isOn", channel)
+
+    def switch_mode(self, channel: int) -> bool | None:
+        """Return true or false from cache."""
+        return self._get_adv_value("switchMode", channel)

+ 53 - 0
switchbot/helpers.py

@@ -1,6 +1,7 @@
 from __future__ import annotations
 
 import asyncio
+import struct
 from collections.abc import Coroutine
 from typing import Any, TypeVar
 
@@ -8,6 +9,10 @@ _R = TypeVar("_R")
 
 _BACKGROUND_TASKS: set[asyncio.Task[Any]] = set()
 
+# Pre-compiled struct unpack methods for better performance
+_UNPACK_UINT16_BE = struct.Struct(">H").unpack_from  # Big-endian unsigned 16-bit
+_UNPACK_UINT24_BE = struct.Struct(">I").unpack  # For 3-byte values (read as 4 bytes)
+
 
 def create_background_task(target: Coroutine[Any, Any, _R]) -> asyncio.Task[_R]:
     """Create a background task."""
@@ -17,6 +22,54 @@ def create_background_task(target: Coroutine[Any, Any, _R]) -> asyncio.Task[_R]:
     return task
 
 
+def parse_power_data(
+    data: bytes, offset: int, scale: float = 1.0, mask: int | None = None
+) -> float:
+    """
+    Parse 2-byte power-related data from bytes.
+
+    Args:
+        data: Raw bytes data
+        offset: Starting offset for the 2-byte value
+        scale: Scale factor to divide the raw value by (default: 1.0)
+        mask: Optional bitmask to apply (default: None)
+
+    Returns:
+        Parsed float value
+
+    """
+    if offset + 2 > len(data):
+        raise ValueError(
+            f"Insufficient data: need at least {offset + 2} bytes, got {len(data)}"
+        )
+
+    value = _UNPACK_UINT16_BE(data, offset)[0]
+    if mask is not None:
+        value &= mask
+    return value / scale
+
+
+def parse_uint24_be(data: bytes, offset: int) -> int:
+    """
+    Parse 3-byte big-endian unsigned integer.
+
+    Args:
+        data: Raw bytes data
+        offset: Starting offset for the 3-byte value
+
+    Returns:
+        Parsed integer value
+
+    """
+    if offset + 3 > len(data):
+        raise ValueError(
+            f"Insufficient data: need at least {offset + 3} bytes, got {len(data)}"
+        )
+
+    # Read 3 bytes and pad with 0 at the beginning for 4-byte struct
+    return _UNPACK_UINT24_BE(b"\x00" + data[offset : offset + 3])[0]
+
+
 def celsius_to_fahrenheit(celsius: float) -> float:
     """Convert temperature from Celsius to Fahrenheit."""
     return (celsius * 9 / 5) + 32

+ 258 - 65
tests/test_adv_parser.py

@@ -1438,71 +1438,6 @@ def test_parse_advertisement_data_keypad():
     )
 
 
-def test_parse_advertisement_data_relay_switch_1pm():
-    """Test parse_advertisement_data for the keypad."""
-    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    adv_data = generate_advertisement_data(
-        manufacturer_data={2409: b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00"},
-        service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"<\x00\x00\x00"},
-        rssi=-67,
-    )
-    result = parse_advertisement_data(
-        ble_device, adv_data, SwitchbotModel.RELAY_SWITCH_1PM
-    )
-    assert result == SwitchBotAdvertisement(
-        address="aa:bb:cc:dd:ee:ff",
-        data={
-            "data": {
-                "switchMode": True,
-                "sequence_number": 71,
-                "isOn": True,
-                "power": 4.9,
-                "voltage": 0,
-                "current": 0,
-            },
-            "isEncrypted": False,
-            "model": "<",
-            "modelFriendlyName": "Relay Switch 1PM",
-            "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
-            "rawAdvData": b"<\x00\x00\x00",
-        },
-        device=ble_device,
-        rssi=-67,
-        active=True,
-    )
-
-
-def test_parse_advertisement_data_relay_switch_1():
-    """Test parse_advertisement_data for the keypad."""
-    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    adv_data = generate_advertisement_data(
-        manufacturer_data={2409: b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00"},
-        service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b";\x00\x00\x00"},
-        rssi=-67,
-    )
-    result = parse_advertisement_data(
-        ble_device, adv_data, SwitchbotModel.RELAY_SWITCH_1
-    )
-    assert result == SwitchBotAdvertisement(
-        address="aa:bb:cc:dd:ee:ff",
-        data={
-            "data": {
-                "switchMode": True,
-                "sequence_number": 71,
-                "isOn": True,
-            },
-            "isEncrypted": False,
-            "model": ";",
-            "modelFriendlyName": "Relay Switch 1",
-            "modelName": SwitchbotModel.RELAY_SWITCH_1,
-            "rawAdvData": b";\x00\x00\x00",
-        },
-        device=ble_device,
-        rssi=-67,
-        active=True,
-    )
-
-
 def test_leak_active():
     """Test parse_advertisement_data for the leak detector."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
@@ -3294,3 +3229,261 @@ def test_humidifer_with_empty_data() -> None:
         rssi=-97,
         active=True,
     )
+
+
+@pytest.mark.parametrize(
+    "test_case",
+    [
+        AdvTestCase(
+            b"\xc0N0\xdd\xb9\xf2\x8a\xc1\x00\x00\x00\x00\x00F\x00\x00",
+            b"=\x00\x00\x00",
+            {
+                1: {
+                    "isOn": True,
+                    "sequence_number": 138,
+                    "switchMode": True,
+                },
+                2: {
+                    "isOn": True,
+                    "sequence_number": 138,
+                    "switchMode": True,
+                },
+            },
+            "=",
+            "Relay Switch 2PM",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ),
+        AdvTestCase(
+            b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
+            b";\x00\x00\x00",
+            {
+                "isOn": True,
+                "sequence_number": 71,
+                "switchMode": True,
+            },
+            ";",
+            "Relay Switch 1",
+            SwitchbotModel.RELAY_SWITCH_1,
+        ),
+        AdvTestCase(
+            b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
+            b"<\x00\x00\x00",
+            {
+                "isOn": True,
+                "sequence_number": 71,
+                "switchMode": True,
+            },
+            "<",
+            "Relay Switch 1PM",
+            SwitchbotModel.RELAY_SWITCH_1PM,
+        ),
+        AdvTestCase(
+            b"$X|\x05BN\x0f\x00\x00\x03\x00\x00\x00\x00\x00\x00",
+            b">\x00\x00\x00",
+            {
+                "door_open": True,
+                "isOn": False,
+                "sequence_number": 15,
+                "switchMode": True,
+            },
+            ">",
+            "Garage Door Opener",
+            SwitchbotModel.GARAGE_DOOR_OPENER,
+        ),
+    ],
+)
+def test_adv_active(test_case: AdvTestCase) -> None:
+    """Test with active data."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: test_case.manufacturer_data},
+        service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": test_case.service_data},
+        rssi=-97,
+    )
+    result = parse_advertisement_data(ble_device, adv_data)
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": test_case.service_data,
+            "data": test_case.data,
+            "isEncrypted": False,
+            "model": test_case.model,
+            "modelFriendlyName": test_case.modelFriendlyName,
+            "modelName": test_case.modelName,
+        },
+        device=ble_device,
+        rssi=-97,
+        active=True,
+    )
+
+
+@pytest.mark.parametrize(
+    "test_case",
+    [
+        AdvTestCase(
+            b"\xc0N0\xdd\xb9\xf2\x8a\xc1\x00\x00\x00\x00\x00F\x00\x00",
+            b"=\x00\x00\x00",
+            {
+                1: {
+                    "isOn": True,
+                    "sequence_number": 138,
+                    "switchMode": True,
+                },
+                2: {
+                    "isOn": True,
+                    "sequence_number": 138,
+                    "switchMode": True,
+                },
+            },
+            "=",
+            "Relay Switch 2PM",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ),
+        AdvTestCase(
+            b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
+            b";\x00\x00\x00",
+            {
+                "isOn": True,
+                "sequence_number": 71,
+                "switchMode": True,
+            },
+            ";",
+            "Relay Switch 1",
+            SwitchbotModel.RELAY_SWITCH_1,
+        ),
+        AdvTestCase(
+            b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
+            b"<\x00\x00\x00",
+            {
+                "isOn": True,
+                "sequence_number": 71,
+                "switchMode": True,
+            },
+            "<",
+            "Relay Switch 1PM",
+            SwitchbotModel.RELAY_SWITCH_1PM,
+        ),
+        AdvTestCase(
+            b"$X|\x05BN\x0f\x00\x00\x03\x00\x00\x00\x00\x00\x00",
+            b">\x00\x00\x00",
+            {
+                "door_open": True,
+                "isOn": False,
+                "sequence_number": 15,
+                "switchMode": True,
+            },
+            ">",
+            "Garage Door Opener",
+            SwitchbotModel.GARAGE_DOOR_OPENER,
+        ),
+    ],
+)
+def test_adv_passive(test_case: AdvTestCase) -> None:
+    """Test with passive data."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: test_case.manufacturer_data},
+        rssi=-97,
+    )
+    result = parse_advertisement_data(ble_device, adv_data, test_case.modelName)
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": None,
+            "data": test_case.data,
+            "isEncrypted": False,
+            "model": test_case.model,
+            "modelFriendlyName": test_case.modelFriendlyName,
+            "modelName": test_case.modelName,
+        },
+        device=ble_device,
+        rssi=-97,
+        active=False,
+    )
+
+
+@pytest.mark.parametrize(
+    "test_case",
+    [
+        AdvTestCase(
+            None,
+            b"=\x00\x00\x00",
+            {
+                1: {
+                    "isOn": True,
+                    "power": 0.0,
+                    "sequence_number": 138,
+                    "switchMode": True,
+                },
+                2: {
+                    "isOn": True,
+                    "power": 7.0,
+                    "sequence_number": 138,
+                    "switchMode": True,
+                },
+            },
+            "=",
+            "Relay Switch 2PM",
+            SwitchbotModel.RELAY_SWITCH_2PM,
+        ),
+        AdvTestCase(
+            None,
+            b";\x00\x00\x00",
+            {
+                "isOn": True,
+                "sequence_number": 71,
+                "switchMode": True,
+            },
+            ";",
+            "Relay Switch 1",
+            SwitchbotModel.RELAY_SWITCH_1,
+        ),
+        AdvTestCase(
+            None,
+            b"<\x00\x00\x00",
+            {
+                "isOn": True,
+                "power": 4.9,
+                "sequence_number": 71,
+                "switchMode": True,
+            },
+            "<",
+            "Relay Switch 1PM",
+            SwitchbotModel.RELAY_SWITCH_1PM,
+        ),
+        AdvTestCase(
+            None,
+            b">\x00\x00\x00",
+            {
+                "door_open": True,
+                "isOn": False,
+                "sequence_number": 15,
+                "switchMode": True,
+            },
+            ">",
+            "Garage Door Opener",
+            SwitchbotModel.GARAGE_DOOR_OPENER,
+        ),
+    ],
+)
+def test_adv_with_empty_data(test_case: AdvTestCase) -> None:
+    """Test with empty data."""
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    adv_data = generate_advertisement_data(
+        manufacturer_data={2409: None},
+        service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": test_case.service_data},
+        rssi=-97,
+    )
+    result = parse_advertisement_data(ble_device, adv_data)
+    assert result == SwitchBotAdvertisement(
+        address="aa:bb:cc:dd:ee:ff",
+        data={
+            "rawAdvData": test_case.service_data,
+            "data": {},
+            "isEncrypted": False,
+            "model": test_case.model,
+        },
+        device=ble_device,
+        rssi=-97,
+        active=True,
+    )

+ 72 - 0
tests/test_helpers.py

@@ -0,0 +1,72 @@
+"""Tests for helper functions."""
+
+import pytest
+
+from switchbot.helpers import parse_power_data
+
+
+def test_parse_power_data_basic():
+    """Test basic power data parsing."""
+    # Test data: bytes with value 0x1234 (4660 decimal) at offset 0
+    data = b"\x12\x34\x56\x78"
+
+    # Test without scale (should return raw value)
+    assert parse_power_data(data, 0) == 4660
+
+    # Test with scale of 10
+    assert parse_power_data(data, 0, 10.0) == 466.0
+
+    # Test with scale of 100
+    assert parse_power_data(data, 0, 100.0) == 46.6
+
+
+def test_parse_power_data_with_offset():
+    """Test power data parsing with different offsets."""
+    data = b"\x00\x00\x12\x34\x56\x78"
+
+    # Value at offset 2 should be 0x1234
+    assert parse_power_data(data, 2, 10.0) == 466.0
+
+    # Value at offset 4 should be 0x5678
+    assert parse_power_data(data, 4, 10.0) == 2213.6
+
+
+def test_parse_power_data_with_mask():
+    """Test power data parsing with bitmask."""
+    # Test data: 0xFFFF
+    data = b"\xff\xff"
+
+    # Without mask
+    assert parse_power_data(data, 0, 10.0) == 6553.5
+
+    # With mask 0x7FFF (clear highest bit)
+    assert parse_power_data(data, 0, 10.0, 0x7FFF) == 3276.7
+
+
+def test_parse_power_data_insufficient_data():
+    """Test error handling for insufficient data."""
+    data = b"\x12"  # Only 1 byte
+
+    # Should raise ValueError when trying to read 2 bytes
+    with pytest.raises(ValueError, match="Insufficient data"):
+        parse_power_data(data, 0)
+
+    # Should also fail at offset 1 with 2-byte data
+    data = b"\x12\x34"
+    with pytest.raises(ValueError, match="Insufficient data"):
+        parse_power_data(data, 1)  # Would need to read bytes 1-2
+
+
+def test_parse_power_data_real_world_examples():
+    """Test with real-world examples from relay switch."""
+    # Simulate relay switch data structure
+    raw_data = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdc\x00\x0f\x00\xe8"
+
+    # Voltage at offset 9-10: 0x00DC = 220 / 10.0 = 22.0V
+    assert parse_power_data(raw_data, 9, 10.0) == 22.0
+
+    # Current at offset 11-12: 0x000F = 15 / 1000.0 = 0.015A
+    assert parse_power_data(raw_data, 11, 1000.0) == 0.015
+
+    # Power at offset 13-14: 0x00E8 = 232 / 10.0 = 23.2W
+    assert parse_power_data(raw_data, 13, 10.0) == 23.2

+ 417 - 37
tests/test_relay_switch.py

@@ -1,41 +1,111 @@
-from unittest.mock import AsyncMock
+from unittest.mock import AsyncMock, MagicMock, patch
 
 import pytest
 from bleak.backends.device import BLEDevice
 
-from switchbot import SwitchBotAdvertisement, SwitchbotModel
+from switchbot import SwitchBotAdvertisement, SwitchbotEncryptedDevice, SwitchbotModel
 from switchbot.devices import relay_switch
+from switchbot.devices.device import _merge_data as merge_data
 
 from .test_adv_parser import generate_ble_device
 
+common_params = [
+    (b";\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1),
+    (b"<\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1PM),
+    (b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER),
+]
 
-def create_device_for_command_testing(calibration=True, reverse_mode=False):
+
+@pytest.fixture
+def common_parametrize_2pm():
+    """Provide common test data."""
+    return {
+        "rawAdvData": b"\x00\x00\x00\x00\x00\x00",
+        "model": SwitchbotModel.RELAY_SWITCH_2PM,
+    }
+
+
+def create_device_for_command_testing(
+    rawAdvData: bytes, model: str, init_data: dict | None = None
+):
+    """Create a device for command testing."""
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    relay_switch_device = relay_switch.SwitchbotRelaySwitch(
-        ble_device, "ff", "ffffffffffffffffffffffffffffffff"
+    device_class = (
+        relay_switch.SwitchbotRelaySwitch2PM
+        if model == SwitchbotModel.RELAY_SWITCH_2PM
+        else relay_switch.SwitchbotRelaySwitch
     )
-    relay_switch_device.update_from_advertisement(make_advertisement_data(ble_device))
-    return relay_switch_device
+    device = device_class(
+        ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=model
+    )
+    device.update_from_advertisement(
+        make_advertisement_data(ble_device, rawAdvData, model, init_data)
+    )
+    device._send_command = AsyncMock()
+    device._check_command_result = MagicMock()
+    device.update = AsyncMock()
+    return device
 
 
-def make_advertisement_data(ble_device: BLEDevice):
+def make_advertisement_data(
+    ble_device: BLEDevice, rawAdvData: bytes, model: str, init_data: dict | None = None
+):
     """Set advertisement data with defaults."""
+    if init_data is None:
+        init_data = {}
+    if model == SwitchbotModel.RELAY_SWITCH_2PM:
+        return SwitchBotAdvertisement(
+            address="aa:bb:cc:dd:ee:ff",
+            data={
+                "rawAdvData": rawAdvData,
+                "data": {
+                    1: {
+                        "switchMode": True,
+                        "sequence_number": 99,
+                        "isOn": True,
+                    },
+                    2: {
+                        "switchMode": True,
+                        "sequence_number": 99,
+                        "isOn": False,
+                    },
+                }
+                | init_data,
+                "isEncrypted": False,
+            },
+            device=ble_device,
+            rssi=-80,
+            active=True,
+        )
+    if model == SwitchbotModel.GARAGE_DOOR_OPENER:
+        return SwitchBotAdvertisement(
+            address="aa:bb:cc:dd:ee:ff",
+            data={
+                "rawAdvData": rawAdvData,
+                "data": {
+                    "switchMode": True,
+                    "sequence_number": 96,
+                    "isOn": True,
+                    "door_open": False,
+                }
+                | init_data,
+                "isEncrypted": False,
+            },
+            device=ble_device,
+            rssi=-80,
+            active=True,
+        )
     return SwitchBotAdvertisement(
         address="aa:bb:cc:dd:ee:ff",
         data={
-            "rawAdvData": b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
+            "rawAdvData": rawAdvData,
             "data": {
                 "switchMode": True,
-                "sequence_number": 71,
+                "sequence_number": 96,
                 "isOn": True,
-                "power": 4.9,
-                "voltage": 0,
-                "current": 0,
-            },
+            }
+            | init_data,
             "isEncrypted": False,
-            "model": "<",
-            "modelFriendlyName": "Relay Switch 1PM",
-            "modelName": SwitchbotModel.RELAY_SWITCH_1PM,
         },
         device=ble_device,
         rssi=-80,
@@ -44,30 +114,340 @@ def make_advertisement_data(ble_device: BLEDevice):
 
 
 @pytest.mark.asyncio
-async def test_turn_on():
-    relay_switch_device = create_device_for_command_testing()
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01")
-    await relay_switch_device.turn_on()
-    assert relay_switch_device.is_on() is True
+@pytest.mark.parametrize(
+    "init_data",
+    [
+        {1: {"isOn": True}, 2: {"isOn": True}},
+    ],
+)
+async def test_turn_on_2PM(common_parametrize_2pm, init_data):
+    """Test turn on command."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
+    )
+    await device.turn_on(1)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][1]
+    )
+    assert device.is_on(1) is True
+
+    await device.turn_on(2)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][2]
+    )
+    assert device.is_on(2) is True
 
 
 @pytest.mark.asyncio
-async def test_turn_off():
-    relay_switch_device = create_device_for_command_testing()
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01")
-    await relay_switch_device.turn_off()
-    assert relay_switch_device.is_on() is False
+@pytest.mark.parametrize(
+    "init_data",
+    [
+        {1: {"isOn": False}, 2: {"isOn": False}},
+    ],
+)
+async def test_turn_off_2PM(common_parametrize_2pm, init_data):
+    """Test turn off command."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
+    )
+    await device.turn_off(1)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][1]
+    )
+    assert device.is_on(1) is False
+
+    await device.turn_off(2)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][2]
+    )
+    assert device.is_on(2) is False
+
+
+@pytest.mark.asyncio
+async def test_turn_toggle_2PM(common_parametrize_2pm):
+    """Test toggle command."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
+    )
+    await device.async_toggle(1)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][1]
+    )
+    assert device.is_on(1) is True
+
+    await device.async_toggle(2)
+    device._send_command.assert_called_with(
+        relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][2]
+    )
+    assert device.is_on(2) is False
 
 
 @pytest.mark.asyncio
-async def test_get_basic_info():
-    relay_switch_device = create_device_for_command_testing()
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01\x01")
-    info = await relay_switch_device.get_basic_info()
-    assert info["is_on"] is True
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x01\x00")
-    info = await relay_switch_device.get_basic_info()
-    assert info["is_on"] is False
-    relay_switch_device._send_command = AsyncMock(return_value=b"\x00\x00")
-    info = await relay_switch_device.get_basic_info()
+async def test_get_switch_mode_2PM(common_parametrize_2pm):
+    """Test get switch mode."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
+    )
+    assert device.switch_mode(1) is True
+    assert device.switch_mode(2) is True
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("info_data", "result"),
+    [
+        (
+            {
+                "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
+                "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
+                "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
+            },
+            [False, 0, 0, 0, 0, True, 0.02, 23, 0.3, 7.0],
+        ),
+        (
+            {
+                "basic_info": b"\x01\x9e\x81\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
+                "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
+                "channel2_info": b"\x01\x00\x05\xbc\x00'<\x02\xb1\x00\xea\x01-\x00F",
+            },
+            [True, 0, 23, 0.1, 0.0, False, 0.02, 0, 0, 0],
+        ),
+    ],
+)
+async def test_get_basic_info_2PM(common_parametrize_2pm, info_data, result):
+    """Test get_basic_info for 2PM devices."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
+    )
+
+    assert device.channel == 2
+
+    device.get_current_time_and_start_time = MagicMock(
+        return_value=("683074d6", "682fba80")
+    )
+
+    async def mock_get_basic_info(arg):
+        if arg == relay_switch.COMMAND_GET_BASIC_INFO:
+            return info_data["basic_info"]
+        if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
+            return info_data["channel1_info"]
+        if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
+            return info_data["channel2_info"]
+        return None
+
+    device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
+
+    info = await device.get_basic_info()
+
+    assert info is not None
+    assert 1 in info
+    assert 2 in info
+
+    assert info[1]["isOn"] == result[0]
+    assert info[1]["energy"] == result[1]
+    assert info[1]["voltage"] == result[2]
+    assert info[1]["current"] == result[3]
+    assert info[1]["power"] == result[4]
+
+    assert info[2]["isOn"] == result[5]
+    assert info[2]["energy"] == result[6]
+    assert info[2]["voltage"] == result[7]
+    assert info[2]["current"] == result[8]
+    assert info[2]["power"] == result[9]
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "info_data",
+    [
+        {
+            "basic_info": None,
+            "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
+            "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
+        },
+        {
+            "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
+            "channel1_info": None,
+            "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
+        },
+        {
+            "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
+            "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
+            "channel2_info": None,
+        },
+    ],
+)
+async def test_basic_info_exceptions_2PM(common_parametrize_2pm, info_data):
+    """Test get_basic_info exceptions."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
+    )
+
+    device.get_current_time_and_start_time = MagicMock(
+        return_value=("683074d6", "682fba80")
+    )
+
+    async def mock_get_basic_info(arg):
+        if arg == relay_switch.COMMAND_GET_BASIC_INFO:
+            return info_data["basic_info"]
+        if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
+            return info_data["channel1_info"]
+        if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
+            return info_data["channel2_info"]
+        return None
+
+    device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
+
+    info = await device.get_basic_info()
+
     assert info is None
+
+
+@pytest.mark.asyncio
+async def test_get_parsed_data_2PM(common_parametrize_2pm):
+    """Test get_parsed_data for 2PM devices."""
+    device = create_device_for_command_testing(
+        common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
+    )
+
+    info = device.get_parsed_data(1)
+    assert info["isOn"] is True
+
+    info = device.get_parsed_data(2)
+    assert info["isOn"] is False
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("rawAdvData", "model"),
+    common_params,
+)
+async def test_turn_on(rawAdvData, model):
+    """Test turn on command."""
+    device = create_device_for_command_testing(rawAdvData, model)
+    await device.turn_on()
+    device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TURN_ON)
+    assert device.is_on() is True
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("rawAdvData", "model"),
+    common_params,
+)
+async def test_turn_off(rawAdvData, model):
+    """Test turn off command."""
+    device = create_device_for_command_testing(rawAdvData, model, {"isOn": False})
+    await device.turn_off()
+    device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TURN_OFF)
+    assert device.is_on() is False
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("rawAdvData", "model"),
+    common_params,
+)
+async def test_toggle(rawAdvData, model):
+    """Test toggle command."""
+    device = create_device_for_command_testing(rawAdvData, model)
+    await device.async_toggle()
+    device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TOGGLE)
+    assert device.is_on() is True
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    ("rawAdvData", "model", "info_data"),
+    [
+        (
+            b">\x00\x00\x00",
+            SwitchbotModel.GARAGE_DOOR_OPENER,
+            {
+                "basic_info": b"\x01>\x80\x0c\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x10",
+                "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
+            },
+        )
+    ],
+)
+async def test_get_basic_info_garage_door_opener(rawAdvData, model, info_data):
+    """Test get_basic_info for garage door opener."""
+    device = create_device_for_command_testing(rawAdvData, model)
+    device.get_current_time_and_start_time = MagicMock(
+        return_value=("683074d6", "682fba80")
+    )
+
+    async def mock_get_basic_info(arg):
+        if arg == relay_switch.COMMAND_GET_BASIC_INFO:
+            return info_data["basic_info"]
+        if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
+            return info_data["channel1_info"]
+        return None
+
+    device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
+    info = await device.get_basic_info()
+    assert info is not None
+    assert info["isOn"] is True
+    assert info["door_open"] is True
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    "model",
+    [
+        SwitchbotModel.RELAY_SWITCH_1,
+        SwitchbotModel.RELAY_SWITCH_1PM,
+        SwitchbotModel.GARAGE_DOOR_OPENER,
+        SwitchbotModel.RELAY_SWITCH_2PM,
+    ],
+)
+@patch.object(SwitchbotEncryptedDevice, "verify_encryption_key", new_callable=AsyncMock)
+async def test_verify_encryption_key(mock_parent_verify, model):
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    key_id = "ff"
+    encryption_key = "ffffffffffffffffffffffffffffffff"
+
+    mock_parent_verify.return_value = True
+
+    result = await relay_switch.SwitchbotRelaySwitch.verify_encryption_key(
+        device=ble_device,
+        key_id=key_id,
+        encryption_key=encryption_key,
+        model=model,
+    )
+
+    mock_parent_verify.assert_awaited_once_with(
+        ble_device,
+        key_id,
+        encryption_key,
+        model,
+    )
+
+    assert result is True
+
+
+@pytest.mark.parametrize(
+    ("old_data", "new_data", "expected_result"),
+    [
+        (
+            {"isOn": True, "sequence_number": 1},
+            {"isOn": False},
+            {"isOn": False, "sequence_number": 1},
+        ),
+        (
+            {
+                1: {"current": 0, "voltage": 220, "power": 0},
+                2: {"current": 1, "voltage": 0, "power": 10},
+            },
+            {1: {"current": 1, "power": 10}, 2: {"current": 0, "voltage": 220}},
+            {
+                1: {"current": 1, "voltage": 220, "power": 10},
+                2: {"current": 0, "voltage": 220, "power": 10},
+            },
+        ),
+    ],
+)
+def test_merge_data(old_data, new_data, expected_result):
+    """Test merging of data dictionaries."""
+    result = merge_data(old_data, new_data)
+    assert result == expected_result