Quellcode durchsuchen

Reduce duplicate code (#359)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick+github@koston.org>
Retha Runolfsson vor 3 Tagen
Ursprung
Commit
3014972af6

+ 3 - 14
switchbot/devices/air_purifier.py

@@ -21,8 +21,6 @@ _LOGGER = logging.getLogger(__name__)
 
 
 COMMAND_HEAD = "570f4c"
-COMMAND_TURN_OFF = f"{COMMAND_HEAD}010000"
-COMMAND_TURN_ON = f"{COMMAND_HEAD}010100"
 COMMAND_SET_MODE = {
     AirPurifierMode.LEVEL_1.name.lower(): f"{COMMAND_HEAD}01010100",
     AirPurifierMode.LEVEL_2.name.lower(): f"{COMMAND_HEAD}01010132",
@@ -37,6 +35,9 @@ DEVICE_GET_BASIC_SETTINGS_KEY = "570f4d81"
 class SwitchbotAirPurifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
     """Representation of a Switchbot Air Purifier."""
 
+    _turn_on_command = f"{COMMAND_HEAD}010100"
+    _turn_off_command = f"{COMMAND_HEAD}010000"
+
     def __init__(
         self,
         device: BLEDevice,
@@ -109,18 +110,6 @@ class SwitchbotAirPurifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
         result = await self._send_command(COMMAND_SET_MODE[preset_mode])
         return self._check_command_result(result, 0, {1})
 
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn on the air purifier."""
-        result = await self._send_command(COMMAND_TURN_ON)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn off the air purifier."""
-        result = await self._send_command(COMMAND_TURN_OFF)
-        return self._check_command_result(result, 0, {1})
-
     def get_current_percentage(self) -> Any:
         """Return cached percentage."""
         return self._get_adv_value("speed")

+ 72 - 16
switchbot/devices/base_light.py

@@ -6,7 +6,7 @@ from typing import Any
 
 from ..helpers import create_background_task
 from ..models import SwitchBotAdvertisement
-from .device import SwitchbotDevice
+from .device import SwitchbotDevice, SwitchbotOperationError, update_after_operation
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -14,14 +14,19 @@ _LOGGER = logging.getLogger(__name__)
 class SwitchbotBaseLight(SwitchbotDevice):
     """Representation of a Switchbot light."""
 
+    _effect_dict: dict[str, list[str]] = {}
+    _set_brightness_command: str = ""
+    _set_color_temp_command: str = ""
+    _set_rgb_command: str = ""
+
     def __init__(self, *args: Any, **kwargs: Any) -> None:
-        """Switchbot bulb constructor."""
+        """Switchbot base light constructor."""
         super().__init__(*args, **kwargs)
         self._state: dict[str, Any] = {}
 
     @property
     def on(self) -> bool | None:
-        """Return if bulb is on."""
+        """Return if light is on."""
         return self.is_on()
 
     @property
@@ -60,7 +65,7 @@ class SwitchbotBaseLight(SwitchbotDevice):
     @property
     def get_effect_list(self) -> list[str] | None:
         """Return the list of supported effects."""
-        return None
+        return list(self._effect_dict.keys()) if self._effect_dict else None
 
     def is_on(self) -> bool | None:
         """Return bulb state from cache."""
@@ -70,25 +75,49 @@ class SwitchbotBaseLight(SwitchbotDevice):
         """Return the current effect."""
         return self._get_adv_value("effect")
 
-    @abstractmethod
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-
-    @abstractmethod
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-
-    @abstractmethod
+    @update_after_operation
     async def set_brightness(self, brightness: int) -> bool:
         """Set brightness."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        hex_brightness = f"{brightness:02X}"
+        self._check_function_support(self._set_brightness_command)
+        result = await self._send_command(
+            self._set_brightness_command.format(hex_brightness)
+        )
+        return self._check_command_result(result, 0, {1})
 
-    @abstractmethod
+    @update_after_operation
     async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
         """Set color temp."""
-
-    @abstractmethod
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        assert 2700 <= color_temp <= 6500, "Color Temp must be between 2700 and 6500"
+        hex_data = f"{brightness:02X}{color_temp:04X}"
+        self._check_function_support(self._set_color_temp_command)
+        result = await self._send_command(self._set_color_temp_command.format(hex_data))
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
     async def set_rgb(self, brightness: int, r: int, g: int, b: int) -> bool:
         """Set rgb."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        assert 0 <= r <= 255, "r must be between 0 and 255"
+        assert 0 <= g <= 255, "g must be between 0 and 255"
+        assert 0 <= b <= 255, "b must be between 0 and 255"
+        self._check_function_support(self._set_rgb_command)
+        hex_data = f"{brightness:02X}{r:02X}{g:02X}{b:02X}"
+        result = await self._send_command(self._set_rgb_command.format(hex_data))
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
+    async def set_effect(self, effect: str) -> bool:
+        """Set effect."""
+        effect_template = self._effect_dict.get(effect)
+        if not effect_template:
+            raise SwitchbotOperationError(f"Effect {effect} not supported")
+        result = await self._send_multiple_commands(effect_template)
+        if result:
+            self._override_state({"effect": effect})
+        return result
 
     async def _send_multiple_commands(self, keys: list[str]) -> bool:
         """
@@ -103,6 +132,33 @@ class SwitchbotBaseLight(SwitchbotDevice):
             final_result |= self._check_command_result(result, 0, {1})
         return final_result
 
+    async def _get_multi_commands_results(
+        self, commands: list[str]
+    ) -> tuple[bytes, bytes] | None:
+        """Check results after sending multiple commands."""
+        if not (results := await self._get_basic_info_by_multi_commands(commands)):
+            return None
+
+        _version_info, _data = results[0], results[1]
+        _LOGGER.debug(
+            "version info: %s, data: %s, address: %s",
+            _version_info,
+            _data,
+            self._device.address,
+        )
+        return _version_info, _data
+
+    async def _get_basic_info_by_multi_commands(
+        self, commands: list[str]
+    ) -> list[bytes] | None:
+        """Get device basic settings by sending multiple commands."""
+        results = []
+        for command in commands:
+            if not (result := await self._get_basic_info(command)):
+                return None
+            results.append(result)
+        return results
+
 
 class SwitchbotSequenceBaseLight(SwitchbotBaseLight):
     """Representation of a Switchbot light."""

+ 17 - 94
switchbot/devices/bulb.py

@@ -1,36 +1,9 @@
 from __future__ import annotations
 
-import logging
 from typing import Any
 
 from ..const.light import BulbColorMode, ColorMode
 from .base_light import SwitchbotSequenceBaseLight
-from .device import REQ_HEADER, SwitchbotOperationError, update_after_operation
-
-BULB_COMMAND_HEADER = "4701"
-BULB_REQUEST = f"{REQ_HEADER}4801"
-
-BULB_COMMAND = f"{REQ_HEADER}{BULB_COMMAND_HEADER}"
-# Bulb keys
-BULB_ON_KEY = f"{BULB_COMMAND}01"
-BULB_OFF_KEY = f"{BULB_COMMAND}02"
-RGB_BRIGHTNESS_KEY = f"{BULB_COMMAND}12"
-CW_BRIGHTNESS_KEY = f"{BULB_COMMAND}13"
-BRIGHTNESS_KEY = f"{BULB_COMMAND}14"
-RGB_KEY = f"{BULB_COMMAND}16"
-CW_KEY = f"{BULB_COMMAND}17"
-
-DEVICE_GET_VERSION_KEY = "570003"
-DEVICE_GET_BASIC_SETTINGS_KEY = "570f4801"
-
-_LOGGER = logging.getLogger(__name__)
-
-
-EFFECT_DICT = {
-    "Colorful": "570F4701010300",
-    "Flickering": "570F4701010301",
-    "Breathing": "570F4701010302",
-}
 
 # Private mapping from device-specific color modes to original ColorMode enum
 _BULB_COLOR_MODE_MAP = {
@@ -39,11 +12,24 @@ _BULB_COLOR_MODE_MAP = {
     BulbColorMode.DYNAMIC: ColorMode.EFFECT,
     BulbColorMode.UNKNOWN: ColorMode.OFF,
 }
+COLOR_BULB_CONTROL_HEADER = "570F4701"
 
 
 class SwitchbotBulb(SwitchbotSequenceBaseLight):
     """Representation of a Switchbot bulb."""
 
+    _turn_on_command = f"{COLOR_BULB_CONTROL_HEADER}01"
+    _turn_off_command = f"{COLOR_BULB_CONTROL_HEADER}02"
+    _set_rgb_command = f"{COLOR_BULB_CONTROL_HEADER}12{{}}"
+    _set_color_temp_command = f"{COLOR_BULB_CONTROL_HEADER}13{{}}"
+    _set_brightness_command = f"{COLOR_BULB_CONTROL_HEADER}14{{}}"
+    _get_basic_info_command = ["570003", "570f4801"]
+    _effect_dict = {
+        "Colorful": ["570F4701010300"],
+        "Flickering": ["570F4701010301"],
+        "Breathing": ["570F4701010302"],
+    }
+
     @property
     def color_modes(self) -> set[ColorMode]:
         """Return the supported color modes."""
@@ -55,76 +41,13 @@ class SwitchbotBulb(SwitchbotSequenceBaseLight):
         device_mode = BulbColorMode(self._get_adv_value("color_mode") or 10)
         return _BULB_COLOR_MODE_MAP.get(device_mode, ColorMode.OFF)
 
-    @property
-    def get_effect_list(self) -> list[str]:
-        """Return the list of supported effects."""
-        return list(EFFECT_DICT.keys())
-
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._send_command(BULB_ON_KEY)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._send_command(BULB_OFF_KEY)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_brightness(self, brightness: int) -> bool:
-        """Set brightness."""
-        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
-        result = await self._send_command(f"{BRIGHTNESS_KEY}{brightness:02X}")
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
-        """Set color temp."""
-        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
-        assert 2700 <= color_temp <= 6500, "Color Temp must be between 2700 and 6500"
-        result = await self._send_command(
-            f"{CW_BRIGHTNESS_KEY}{brightness:02X}{color_temp:04X}"
-        )
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_rgb(self, brightness: int, r: int, g: int, b: int) -> bool:
-        """Set rgb."""
-        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
-        assert 0 <= r <= 255, "r must be between 0 and 255"
-        assert 0 <= g <= 255, "g must be between 0 and 255"
-        assert 0 <= b <= 255, "b must be between 0 and 255"
-        result = await self._send_command(
-            f"{RGB_BRIGHTNESS_KEY}{brightness:02X}{r:02X}{g:02X}{b:02X}"
-        )
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_effect(self, effect: str) -> bool:
-        """Set effect."""
-        effect_template = EFFECT_DICT.get(effect)
-        if not effect_template:
-            raise SwitchbotOperationError(f"Effect {effect} not supported")
-        result = await self._send_command(effect_template)
-        if result:
-            self._override_state({"effect": effect})
-        return result
-
     async def get_basic_info(self) -> dict[str, Any] | None:
         """Get device basic settings."""
-        if not (_data := await self._get_basic_info(DEVICE_GET_BASIC_SETTINGS_KEY)):
+        if not (
+            res := await self._get_multi_commands_results(self._get_basic_info_command)
+        ):
             return None
-        if not (_version_info := await self._get_basic_info(DEVICE_GET_VERSION_KEY)):
-            return None
-
-        _LOGGER.debug(
-            "data: %s, version info: %s, address: %s",
-            _data,
-            _version_info,
-            self._device.address,
-        )
+        _version_info, _data = res
 
         self._state["r"] = _data[3]
         self._state["g"] = _data[4]

+ 15 - 51
switchbot/devices/ceiling_light.py

@@ -1,6 +1,5 @@
 from __future__ import annotations
 
-import logging
 from typing import Any
 
 from ..const.light import (
@@ -9,21 +8,7 @@ from ..const.light import (
     ColorMode,
 )
 from .base_light import SwitchbotSequenceBaseLight
-from .device import REQ_HEADER, update_after_operation
-
-CEILING_LIGHT_COMMAND_HEADER = "5401"
-CEILING_LIGHT_REQUEST = f"{REQ_HEADER}5501"
-
-CEILING_LIGHT_COMMAND = f"{REQ_HEADER}{CEILING_LIGHT_COMMAND_HEADER}"
-CEILING_LIGHT_ON_KEY = f"{CEILING_LIGHT_COMMAND}01FF01FFFF"
-CEILING_LIGHT_OFF_KEY = f"{CEILING_LIGHT_COMMAND}02FF01FFFF"
-CW_BRIGHTNESS_KEY = f"{CEILING_LIGHT_COMMAND}010001"
-BRIGHTNESS_KEY = f"{CEILING_LIGHT_COMMAND}01FF01"
-
-DEVICE_GET_VERSION_KEY = "5702"
-DEVICE_GET_BASIC_SETTINGS_KEY = "570f5581"
-
-_LOGGER = logging.getLogger(__name__)
+from .device import update_after_operation
 
 # Private mapping from device-specific color modes to original ColorMode enum
 _CEILING_LIGHT_COLOR_MODE_MAP = {
@@ -32,11 +17,18 @@ _CEILING_LIGHT_COLOR_MODE_MAP = {
     CeilingLightColorMode.MUSIC: ColorMode.EFFECT,
     CeilingLightColorMode.UNKNOWN: ColorMode.OFF,
 }
+CEILING_LIGHT_CONTROL_HEADER = "570F5401"
 
 
 class SwitchbotCeilingLight(SwitchbotSequenceBaseLight):
     """Representation of a Switchbot ceiling light."""
 
+    _turn_on_command = f"{CEILING_LIGHT_CONTROL_HEADER}01FF01FFFF"
+    _turn_off_command = f"{CEILING_LIGHT_CONTROL_HEADER}02FF01FFFF"
+    _set_brightness_command = f"{CEILING_LIGHT_CONTROL_HEADER}01FF01{{}}"
+    _set_color_temp_command = f"{CEILING_LIGHT_CONTROL_HEADER}01FF01{{}}"
+    _get_basic_info_command = ["5702", "570f5581"]
+
     @property
     def color_modes(self) -> set[ColorMode]:
         """Return the supported color modes."""
@@ -48,51 +40,23 @@ class SwitchbotCeilingLight(SwitchbotSequenceBaseLight):
         device_mode = CeilingLightColorMode(self._get_adv_value("color_mode") or 10)
         return _CEILING_LIGHT_COLOR_MODE_MAP.get(device_mode, ColorMode.OFF)
 
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._send_command(CEILING_LIGHT_ON_KEY)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._send_command(CEILING_LIGHT_OFF_KEY)
-        return self._check_command_result(result, 0, {1})
-
     @update_after_operation
     async def set_brightness(self, brightness: int) -> bool:
         """Set brightness."""
         assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        hex_brightness = f"{brightness:02X}"
         color_temp = self._state.get("cw", DEFAULT_COLOR_TEMP)
-        result = await self._send_command(
-            f"{BRIGHTNESS_KEY}{brightness:02X}{color_temp:04X}"
-        )
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
-        """Set color temp."""
-        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
-        assert 2700 <= color_temp <= 6500, "Color Temp must be between 2700 and 6500"
-        result = await self._send_command(
-            f"{CW_BRIGHTNESS_KEY}{brightness:02X}{color_temp:04X}"
-        )
+        hex_data = f"{hex_brightness}{color_temp:04X}"
+        result = await self._send_command(self._set_brightness_command.format(hex_data))
         return self._check_command_result(result, 0, {1})
 
     async def get_basic_info(self) -> dict[str, Any] | None:
         """Get device basic settings."""
-        if not (_data := await self._get_basic_info(DEVICE_GET_BASIC_SETTINGS_KEY)):
+        if not (
+            res := await self._get_multi_commands_results(self._get_basic_info_command)
+        ):
             return None
-        if not (_version_info := await self._get_basic_info(DEVICE_GET_VERSION_KEY)):
-            return None
-
-        _LOGGER.debug(
-            "data: %s, version info: %s, address: %s",
-            _data,
-            _version_info,
-            self._device.address,
-        )
+        _version_info, _data = res
 
         self._state["cw"] = int.from_bytes(_data[3:5], "big")
 

+ 25 - 1
switchbot/devices/device.py

@@ -125,6 +125,9 @@ def _handle_timeout(fut: asyncio.Future[None]) -> None:
 class SwitchbotBaseDevice:
     """Base Representation of a Switchbot Device."""
 
+    _turn_on_command: str | None = None
+    _turn_off_command: str | None = None
+
     def __init__(
         self,
         device: BLEDevice,
@@ -694,6 +697,27 @@ class SwitchbotBaseDevice:
         time_since_last_full_update = time.monotonic() - self._last_full_update
         return not time_since_last_full_update < PASSIVE_POLL_INTERVAL
 
+    def _check_function_support(self, cmd: str | None = None) -> None:
+        """Check if the command is supported by the device model."""
+        if not cmd:
+            raise SwitchbotOperationError(
+                f"Current device {self._device.address} does not support this functionality"
+            )
+
+    @update_after_operation
+    async def turn_on(self) -> bool:
+        """Turn device on."""
+        self._check_function_support(self._turn_on_command)
+        result = await self._send_command(self._turn_on_command)
+        return self._check_command_result(result, 0, {1})
+
+    @update_after_operation
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        self._check_function_support(self._turn_off_command)
+        result = await self._send_command(self._turn_off_command)
+        return self._check_command_result(result, 0, {1})
+
 
 class SwitchbotDevice(SwitchbotBaseDevice):
     """
@@ -735,8 +759,8 @@ class SwitchbotEncryptedDevice(SwitchbotDevice):
         self._encryption_key = bytearray.fromhex(encryption_key)
         self._iv: bytes | None = None
         self._cipher: bytes | None = None
-        self._model = model
         super().__init__(device, None, interface, **kwargs)
+        self._model = model
 
     # Old non-async method preserved for backwards compatibility
     @classmethod

+ 3 - 13
switchbot/devices/evaporative_humidifier.py

@@ -23,7 +23,6 @@ _LOGGER = logging.getLogger(__name__)
 COMMAND_HEADER = "57"
 COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
 COMMAND_TURN_ON = f"{COMMAND_HEADER}0f430101"
-COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f430100"
 COMMAND_CHILD_LOCK_ON = f"{COMMAND_HEADER}0f430501"
 COMMAND_CHILD_LOCK_OFF = f"{COMMAND_HEADER}0f430500"
 COMMAND_AUTO_DRY_ON = f"{COMMAND_HEADER}0f430a01"
@@ -48,6 +47,9 @@ DEVICE_GET_BASIC_SETTINGS_KEY = "570f4481"
 class SwitchbotEvaporativeHumidifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
     """Representation of a Switchbot Evaporative Humidifier"""
 
+    _turn_on_command = COMMAND_TURN_ON
+    _turn_off_command = f"{COMMAND_HEADER}0f430100"
+
     def __init__(
         self,
         device: BLEDevice,
@@ -113,18 +115,6 @@ class SwitchbotEvaporativeHumidifier(SwitchbotSequenceDevice, SwitchbotEncrypted
             "target_humidity": target_humidity,
         }
 
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._send_command(COMMAND_TURN_ON)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._send_command(COMMAND_TURN_OFF)
-        return self._check_command_result(result, 0, {1})
-
     @update_after_operation
     async def set_target_humidity(self, target_humidity: int) -> bool:
         """Set target humidity."""

+ 2 - 14
switchbot/devices/fan.py

@@ -16,8 +16,6 @@ _LOGGER = logging.getLogger(__name__)
 
 
 COMMAND_HEAD = "570f41"
-COMMAND_TURN_ON = f"{COMMAND_HEAD}0101"
-COMMAND_TURN_OFF = f"{COMMAND_HEAD}0102"
 COMMAND_START_OSCILLATION = f"{COMMAND_HEAD}020101ff"
 COMMAND_STOP_OSCILLATION = f"{COMMAND_HEAD}020102ff"
 COMMAND_SET_MODE = {
@@ -33,8 +31,8 @@ COMMAND_GET_BASIC_INFO = "570f428102"
 class SwitchbotFan(SwitchbotSequenceDevice):
     """Representation of a Switchbot Circulator Fan."""
 
-    def __init__(self, device, password=None, interface=0, **kwargs):
-        super().__init__(device, password, interface, **kwargs)
+    _turn_on_command = f"{COMMAND_HEAD}0101"
+    _turn_off_command = f"{COMMAND_HEAD}0102"
 
     async def get_basic_info(self) -> dict[str, Any] | None:
         """Get device basic settings."""
@@ -88,16 +86,6 @@ class SwitchbotFan(SwitchbotSequenceDevice):
             return await self._send_command(COMMAND_START_OSCILLATION)
         return await self._send_command(COMMAND_STOP_OSCILLATION)
 
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn on the fan."""
-        return await self._send_command(COMMAND_TURN_ON)
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn off the fan."""
-        return await self._send_command(COMMAND_TURN_OFF)
-
     def get_current_percentage(self) -> Any:
         """Return cached percentage."""
         return self._get_adv_value("speed")

+ 25 - 105
switchbot/devices/light_strip.py

@@ -1,6 +1,5 @@
 from __future__ import annotations
 
-import logging
 from typing import Any
 
 from bleak.backends.device import BLEDevice
@@ -8,29 +7,19 @@ from bleak.backends.device import BLEDevice
 from ..const import SwitchbotModel
 from ..const.light import ColorMode, StripLightColorMode
 from .base_light import SwitchbotSequenceBaseLight
-from .device import (
-    REQ_HEADER,
-    SwitchbotEncryptedDevice,
-    SwitchbotOperationError,
-    update_after_operation,
-)
+from .device import SwitchbotEncryptedDevice
 
-STRIP_COMMMAND_HEADER = "4901"
-STRIP_REQUEST = f"{REQ_HEADER}4A01"
-
-STRIP_COMMAND = f"{REQ_HEADER}{STRIP_COMMMAND_HEADER}"
-# Strip keys
-STRIP_ON_KEY = f"{STRIP_COMMAND}01"
-STRIP_OFF_KEY = f"{STRIP_COMMAND}02"
-COLOR_TEMP_KEY = f"{STRIP_COMMAND}11"
-RGB_BRIGHTNESS_KEY = f"{STRIP_COMMAND}12"
-BRIGHTNESS_KEY = f"{STRIP_COMMAND}14"
-
-DEVICE_GET_VERSION_KEY = "570003"
-
-_LOGGER = logging.getLogger(__name__)
-
-EFFECT_DICT = {
+# Private mapping from device-specific color modes to original ColorMode enum
+_STRIP_LIGHT_COLOR_MODE_MAP = {
+    StripLightColorMode.RGB: ColorMode.RGB,
+    StripLightColorMode.SCENE: ColorMode.EFFECT,
+    StripLightColorMode.MUSIC: ColorMode.EFFECT,
+    StripLightColorMode.CONTROLLER: ColorMode.EFFECT,
+    StripLightColorMode.COLOR_TEMP: ColorMode.COLOR_TEMP,
+    StripLightColorMode.UNKNOWN: ColorMode.OFF,
+}
+LIGHT_STRIP_CONTROL_HEADER = "570F4901"
+COMMON_EFFECTS = {
     "Christmas": [
         "570F49070200033C01",
         "570F490701000600009902006D0EFF0021",
@@ -109,20 +98,18 @@ EFFECT_DICT = {
     ],
 }
 
-# Private mapping from device-specific color modes to original ColorMode enum
-_STRIP_LIGHT_COLOR_MODE_MAP = {
-    StripLightColorMode.RGB: ColorMode.RGB,
-    StripLightColorMode.SCENE: ColorMode.EFFECT,
-    StripLightColorMode.MUSIC: ColorMode.EFFECT,
-    StripLightColorMode.CONTROLLER: ColorMode.EFFECT,
-    StripLightColorMode.COLOR_TEMP: ColorMode.COLOR_TEMP,
-    StripLightColorMode.UNKNOWN: ColorMode.OFF,
-}
-
 
 class SwitchbotLightStrip(SwitchbotSequenceBaseLight):
     """Representation of a Switchbot light strip."""
 
+    _effect_dict = COMMON_EFFECTS
+    _turn_on_command = f"{LIGHT_STRIP_CONTROL_HEADER}01"
+    _turn_off_command = f"{LIGHT_STRIP_CONTROL_HEADER}02"
+    _set_rgb_command = f"{LIGHT_STRIP_CONTROL_HEADER}12{{}}"
+    _set_color_temp_command = f"{LIGHT_STRIP_CONTROL_HEADER}11{{}}"
+    _set_brightness_command = f"{LIGHT_STRIP_CONTROL_HEADER}14{{}}"
+    _get_basic_info_command = ["570003", "570f4A01"]
+
     @property
     def color_modes(self) -> set[ColorMode]:
         """Return the supported color modes."""
@@ -134,71 +121,14 @@ class SwitchbotLightStrip(SwitchbotSequenceBaseLight):
         device_mode = StripLightColorMode(self._get_adv_value("color_mode") or 10)
         return _STRIP_LIGHT_COLOR_MODE_MAP.get(device_mode, ColorMode.OFF)
 
-    @property
-    def get_effect_list(self) -> list[str]:
-        """Return the list of supported effects."""
-        return list(EFFECT_DICT.keys())
-
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._send_command(STRIP_ON_KEY)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._send_command(STRIP_OFF_KEY)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_brightness(self, brightness: int) -> bool:
-        """Set brightness."""
-        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
-        result = await self._send_command(f"{BRIGHTNESS_KEY}{brightness:02X}")
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_rgb(self, brightness: int, r: int, g: int, b: int) -> bool:
-        """Set rgb."""
-        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
-        assert 0 <= r <= 255, "r must be between 0 and 255"
-        assert 0 <= g <= 255, "g must be between 0 and 255"
-        assert 0 <= b <= 255, "b must be between 0 and 255"
-        result = await self._send_command(
-            f"{RGB_BRIGHTNESS_KEY}{brightness:02X}{r:02X}{g:02X}{b:02X}"
-        )
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def set_effect(self, effect: str) -> bool:
-        """Set effect."""
-        effect_template = EFFECT_DICT.get(effect)
-        if not effect_template:
-            raise SwitchbotOperationError(f"Effect {effect} not supported")
-        result = await self._send_multiple_commands(effect_template)
-        if result:
-            self._override_state({"effect": effect})
-        return result
-
-    async def get_basic_info(
-        self,
-        device_get_basic_info: str = STRIP_REQUEST,
-        device_get_version_info: str = DEVICE_GET_VERSION_KEY,
-    ) -> dict[str, Any] | None:
+    async def get_basic_info(self) -> dict[str, Any] | None:
         """Get device basic settings."""
-        if not (_data := await self._get_basic_info(device_get_basic_info)):
-            return None
-        if not (_version_info := await self._get_basic_info(device_get_version_info)):
+        if not (
+            res := await self._get_multi_commands_results(self._get_basic_info_command)
+        ):
             return None
 
-        _LOGGER.debug(
-            "data: %s, version info: %s, address: %s",
-            _data,
-            _version_info,
-            self._device.address,
-        )
-
+        _version_info, _data = res
         self._state["r"] = _data[3]
         self._state["g"] = _data[4]
         self._state["b"] = _data[5]
@@ -247,13 +177,3 @@ class SwitchbotStripLight3(SwitchbotEncryptedDevice, SwitchbotLightStrip):
     def color_modes(self) -> set[ColorMode]:
         """Return the supported color modes."""
         return {ColorMode.RGB, ColorMode.COLOR_TEMP}
-
-    @update_after_operation
-    async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
-        """Set color temp."""
-        assert 0 <= brightness <= 100
-        assert self.min_temp <= color_temp <= self.max_temp
-        result = await self._send_command(
-            f"{COLOR_TEMP_KEY}{brightness:02X}{color_temp:04X}"
-        )
-        return self._check_command_result(result, 0, {1})

+ 5 - 15
switchbot/devices/relay_switch.py

@@ -21,9 +21,8 @@ SWITCH2_ON_MASK = 0b01000000
 DOOR_OPEN_MASK = 0b00100000
 
 COMMAND_HEADER = "57"
-COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
-COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
-COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
+COMMAND_CONTROL = "570f70"
+COMMAND_TOGGLE = f"{COMMAND_CONTROL}010200"
 COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
 
 COMMAND_GET_BASIC_INFO = f"{COMMAND_HEADER}0f7181"
@@ -60,6 +59,9 @@ MULTI_CHANNEL_COMMANDS_GET_VOLTAGE_AND_CURRENT = {
 class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
     """Representation of a Switchbot relay switch 1pm."""
 
+    _turn_on_command = f"{COMMAND_CONTROL}010100"
+    _turn_off_command = f"{COMMAND_CONTROL}010000"
+
     def __init__(
         self,
         device: BLEDevice,
@@ -192,18 +194,6 @@ class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
             return common_data | garage_door_opener_data
         return common_data | user_data
 
-    @update_after_operation
-    async def turn_on(self) -> bool:
-        """Turn device on."""
-        result = await self._send_command(COMMAND_TURN_ON)
-        return self._check_command_result(result, 0, {1})
-
-    @update_after_operation
-    async def turn_off(self) -> bool:
-        """Turn device off."""
-        result = await self._send_command(COMMAND_TURN_OFF)
-        return self._check_command_result(result, 0, {1})
-
     @update_after_operation
     async def async_toggle(self, **kwargs) -> bool:
         """Toggle device."""

+ 0 - 3
switchbot/devices/vacuum.py

@@ -19,9 +19,6 @@ COMMAND_RETURN_DOCK = {
 class SwitchbotVacuum(SwitchbotSequenceDevice):
     """Representation of a Switchbot Vacuum."""
 
-    def __init__(self, device, password=None, interface=0, **kwargs):
-        super().__init__(device, password, interface, **kwargs)
-
     @update_after_operation
     async def clean_up(self, protocol_version: int) -> bool:
         """Send command to perform a spot clean-up."""

+ 17 - 13
tests/test_bulb.py

@@ -11,9 +11,11 @@ from switchbot.devices.device import SwitchbotOperationError
 from .test_adv_parser import generate_ble_device
 
 
-def create_device_for_command_testing(init_data: dict | None = None):
+def create_device_for_command_testing(
+    init_data: dict | None = None, model: SwitchbotModel = SwitchbotModel.COLOR_BULB
+):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    device = bulb.SwitchbotBulb(ble_device)
+    device = bulb.SwitchbotBulb(ble_device, model=model)
     device.update_from_advertisement(make_advertisement_data(ble_device, init_data))
     device._send_command = AsyncMock()
     device._check_command_result = MagicMock()
@@ -70,7 +72,7 @@ async def test_default_info():
     assert device.brightness == 1
     assert device.min_temp == 2700
     assert device.max_temp == 6500
-    assert device.get_effect_list == list(bulb.EFFECT_DICT.keys())
+    assert device.get_effect_list == list(device._effect_dict.keys())
 
 
 @pytest.mark.asyncio
@@ -81,9 +83,9 @@ async def test_get_basic_info_returns_none(basic_info, version_info):
     device = create_device_for_command_testing()
 
     async def mock_get_basic_info(arg):
-        if arg == bulb.DEVICE_GET_BASIC_SETTINGS_KEY:
+        if arg == device._get_basic_info_command[1]:
             return basic_info
-        if arg == bulb.DEVICE_GET_VERSION_KEY:
+        if arg == device._get_basic_info_command[0]:
             return version_info
         return None
 
@@ -124,9 +126,9 @@ async def test_get_basic_info(info_data, result):
     device = create_device_for_command_testing()
 
     async def mock_get_basic_info(args: str) -> list[int] | None:
-        if args == bulb.DEVICE_GET_BASIC_SETTINGS_KEY:
+        if args == device._get_basic_info_command[1]:
             return info_data["basic_info"]
-        if args == bulb.DEVICE_GET_VERSION_KEY:
+        if args == device._get_basic_info_command[0]:
             return info_data["version_info"]
         return None
 
@@ -150,7 +152,9 @@ async def test_set_color_temp():
 
     await device.set_color_temp(50, 3000)
 
-    device._send_command.assert_called_with(f"{bulb.CW_BRIGHTNESS_KEY}320BB8")
+    device._send_command.assert_called_with(
+        device._set_color_temp_command.format("320BB8")
+    )
 
 
 @pytest.mark.asyncio
@@ -160,7 +164,7 @@ async def test_turn_on():
 
     await device.turn_on()
 
-    device._send_command.assert_called_with(bulb.BULB_ON_KEY)
+    device._send_command.assert_called_with(device._turn_on_command)
 
     assert device.is_on() is True
 
@@ -172,7 +176,7 @@ async def test_turn_off():
 
     await device.turn_off()
 
-    device._send_command.assert_called_with(bulb.BULB_OFF_KEY)
+    device._send_command.assert_called_with(device._turn_off_command)
 
     assert device.is_on() is False
 
@@ -184,7 +188,7 @@ async def test_set_brightness():
 
     await device.set_brightness(75)
 
-    device._send_command.assert_called_with(f"{bulb.BRIGHTNESS_KEY}4B")
+    device._send_command.assert_called_with(device._set_brightness_command.format("4B"))
 
 
 @pytest.mark.asyncio
@@ -194,7 +198,7 @@ async def test_set_rgb():
 
     await device.set_rgb(100, 255, 128, 64)
 
-    device._send_command.assert_called_with(f"{bulb.RGB_BRIGHTNESS_KEY}64FF8040")
+    device._send_command.assert_called_with(device._set_rgb_command.format("64FF8040"))
 
 
 @pytest.mark.asyncio
@@ -215,6 +219,6 @@ async def test_set_effect_with_valid_effect():
 
     await device.set_effect("Colorful")
 
-    device._send_command.assert_called_with(bulb.EFFECT_DICT["Colorful"])
+    device._send_command.assert_called_with(device._effect_dict["Colorful"][0])
 
     assert device.get_effect() == "Colorful"

+ 16 - 10
tests/test_ceiling_light.py

@@ -10,9 +10,11 @@ from switchbot.devices import ceiling_light
 from .test_adv_parser import generate_ble_device
 
 
-def create_device_for_command_testing(init_data: dict | None = None):
+def create_device_for_command_testing(
+    init_data: dict | None = None, model: SwitchbotModel = SwitchbotModel.CEILING_LIGHT
+):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    device = ceiling_light.SwitchbotCeilingLight(ble_device)
+    device = ceiling_light.SwitchbotCeilingLight(ble_device, model=model)
     device.update_from_advertisement(make_advertisement_data(ble_device, init_data))
     device._send_command = AsyncMock()
     device._check_command_result = MagicMock()
@@ -76,9 +78,9 @@ async def test_get_basic_info_returns_none(basic_info, version_info):
     device = create_device_for_command_testing()
 
     async def mock_get_basic_info(arg):
-        if arg == ceiling_light.DEVICE_GET_BASIC_SETTINGS_KEY:
+        if arg == device._get_basic_info_command[1]:
             return basic_info
-        if arg == ceiling_light.DEVICE_GET_VERSION_KEY:
+        if arg == device._get_basic_info_command[0]:
             return version_info
         return None
 
@@ -119,9 +121,9 @@ async def test_get_basic_info(info_data, result):
     device = create_device_for_command_testing()
 
     async def mock_get_basic_info(args: str) -> list[int] | None:
-        if args == ceiling_light.DEVICE_GET_BASIC_SETTINGS_KEY:
+        if args == device._get_basic_info_command[1]:
             return info_data["basic_info"]
-        if args == ceiling_light.DEVICE_GET_VERSION_KEY:
+        if args == device._get_basic_info_command[0]:
             return info_data["version_info"]
         return None
 
@@ -142,7 +144,9 @@ async def test_set_color_temp():
 
     await device.set_color_temp(50, 3000)
 
-    device._send_command.assert_called_with(f"{ceiling_light.CW_BRIGHTNESS_KEY}320BB8")
+    device._send_command.assert_called_with(
+        device._set_color_temp_command.format("320BB8")
+    )
 
 
 @pytest.mark.asyncio
@@ -152,7 +156,7 @@ async def test_turn_on():
 
     await device.turn_on()
 
-    device._send_command.assert_called_with(ceiling_light.CEILING_LIGHT_ON_KEY)
+    device._send_command.assert_called_with(device._turn_on_command)
 
     assert device.is_on() is True
 
@@ -164,7 +168,7 @@ async def test_turn_off():
 
     await device.turn_off()
 
-    device._send_command.assert_called_with(ceiling_light.CEILING_LIGHT_OFF_KEY)
+    device._send_command.assert_called_with(device._turn_off_command)
 
     assert device.is_on() is False
 
@@ -176,4 +180,6 @@ async def test_set_brightness():
 
     await device.set_brightness(75)
 
-    device._send_command.assert_called_with(f"{ceiling_light.BRIGHTNESS_KEY}4B0FA1")
+    device._send_command.assert_called_with(
+        device._set_brightness_command.format("4B0FA1")
+    )

+ 8 - 5
tests/test_fan.py

@@ -1,4 +1,4 @@
-from unittest.mock import AsyncMock
+from unittest.mock import AsyncMock, MagicMock
 
 import pytest
 from bleak.backends.device import BLEDevice
@@ -10,11 +10,14 @@ from switchbot.devices import fan
 from .test_adv_parser import generate_ble_device
 
 
-def create_device_for_command_testing(init_data: dict | None = None):
+def create_device_for_command_testing(
+    init_data: dict | None = None, model: SwitchbotModel = SwitchbotModel.CIRCULATOR_FAN
+):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
-    fan_device = fan.SwitchbotFan(ble_device)
+    fan_device = fan.SwitchbotFan(ble_device, model=model)
     fan_device.update_from_advertisement(make_advertisement_data(ble_device, init_data))
     fan_device._send_command = AsyncMock()
+    fan_device._check_command_result = MagicMock()
     fan_device.update = AsyncMock()
     return fan_device
 
@@ -128,7 +131,7 @@ async def test_set_preset_mode():
 
 
 @pytest.mark.asyncio
-async def test_set_set_percentage_with_speed_is_0():
+async def test_set_percentage_with_speed_is_0():
     fan_device = create_device_for_command_testing({"speed": 0, "isOn": False})
     await fan_device.turn_off()
     assert fan_device.get_current_percentage() == 0
@@ -136,7 +139,7 @@ async def test_set_set_percentage_with_speed_is_0():
 
 
 @pytest.mark.asyncio
-async def test_set_set_percentage():
+async def test_set_percentage():
     fan_device = create_device_for_command_testing({"speed": 80})
     await fan_device.set_percentage(80)
     assert fan_device.get_current_percentage() == 80

+ 2 - 2
tests/test_relay_switch.py

@@ -326,7 +326,7 @@ async def test_turn_on(rawAdvData, model):
     """Test turn on command."""
     device = create_device_for_command_testing(rawAdvData, model)
     await device.turn_on()
-    device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TURN_ON)
+    device._send_command.assert_awaited_once_with(device._turn_on_command)
     assert device.is_on() is True
 
 
@@ -339,7 +339,7 @@ async def test_turn_off(rawAdvData, model):
     """Test turn off command."""
     device = create_device_for_command_testing(rawAdvData, model, {"isOn": False})
     await device.turn_off()
-    device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TURN_OFF)
+    device._send_command.assert_awaited_once_with(device._turn_off_command)
     assert device.is_on() is False
 
 

+ 33 - 15
tests/test_strip_light.py

@@ -12,10 +12,12 @@ from switchbot.devices.device import SwitchbotEncryptedDevice, SwitchbotOperatio
 from .test_adv_parser import generate_ble_device
 
 
-def create_device_for_command_testing(init_data: dict | None = None):
+def create_device_for_command_testing(
+    init_data: dict | None = None, model: SwitchbotModel = SwitchbotModel.STRIP_LIGHT_3
+):
     ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
     device = light_strip.SwitchbotStripLight3(
-        ble_device, "ff", "ffffffffffffffffffffffffffffffff"
+        ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=model
     )
     device.update_from_advertisement(make_advertisement_data(ble_device, init_data))
     device._send_command = AsyncMock()
@@ -75,7 +77,7 @@ async def test_default_info():
     assert device.brightness == 30
     assert device.min_temp == 2700
     assert device.max_temp == 6500
-    assert device.get_effect_list == list(light_strip.EFFECT_DICT.keys())
+    assert device.get_effect_list == list(device._effect_dict.keys())
 
 
 @pytest.mark.asyncio
@@ -86,9 +88,9 @@ async def test_get_basic_info_returns_none(basic_info, version_info):
     device = create_device_for_command_testing()
 
     async def mock_get_basic_info(arg):
-        if arg == light_strip.STRIP_REQUEST:
+        if arg == device._get_basic_info_command[1]:
             return basic_info
-        if arg == light_strip.DEVICE_GET_VERSION_KEY:
+        if arg == device._get_basic_info_command[0]:
             return version_info
         return None
 
@@ -129,9 +131,9 @@ async def test_strip_light_get_basic_info(info_data, result):
     device = create_device_for_command_testing()
 
     async def mock_get_basic_info(args: str) -> list[int] | None:
-        if args == light_strip.STRIP_REQUEST:
+        if args == device._get_basic_info_command[1]:
             return info_data["basic_info"]
-        if args == light_strip.DEVICE_GET_VERSION_KEY:
+        if args == device._get_basic_info_command[0]:
             return info_data["version_info"]
         return None
 
@@ -155,7 +157,9 @@ async def test_set_color_temp():
 
     await device.set_color_temp(50, 3000)
 
-    device._send_command.assert_called_with(f"{light_strip.COLOR_TEMP_KEY}320BB8")
+    device._send_command.assert_called_with(
+        device._set_color_temp_command.format("320BB8")
+    )
 
 
 @pytest.mark.asyncio
@@ -165,7 +169,7 @@ async def test_turn_on():
 
     await device.turn_on()
 
-    device._send_command.assert_called_with(light_strip.STRIP_ON_KEY)
+    device._send_command.assert_called_with(device._turn_on_command)
 
     assert device.is_on() is True
 
@@ -177,7 +181,7 @@ async def test_turn_off():
 
     await device.turn_off()
 
-    device._send_command.assert_called_with(light_strip.STRIP_OFF_KEY)
+    device._send_command.assert_called_with(device._turn_off_command)
 
     assert device.is_on() is False
 
@@ -189,7 +193,7 @@ async def test_set_brightness():
 
     await device.set_brightness(75)
 
-    device._send_command.assert_called_with(f"{light_strip.BRIGHTNESS_KEY}4B")
+    device._send_command.assert_called_with(device._set_brightness_command.format("4B"))
 
 
 @pytest.mark.asyncio
@@ -199,7 +203,7 @@ async def test_set_rgb():
 
     await device.set_rgb(100, 255, 128, 64)
 
-    device._send_command.assert_called_with(f"{light_strip.RGB_BRIGHTNESS_KEY}64FF8040")
+    device._send_command.assert_called_with(device._set_rgb_command.format("64FF8040"))
 
 
 @pytest.mark.asyncio
@@ -221,9 +225,7 @@ async def test_set_effect_with_valid_effect():
 
     await device.set_effect("Christmas")
 
-    device._send_multiple_commands.assert_called_with(
-        light_strip.EFFECT_DICT["Christmas"]
-    )
+    device._send_multiple_commands.assert_called_with(device._effect_dict["Christmas"])
 
     assert device.get_effect() == "Christmas"
 
@@ -300,3 +302,19 @@ async def test_unimplemented_color_mode():
 
     with pytest.raises(NotImplementedError):
         _ = device.color_mode
+
+
+@pytest.mark.asyncio
+async def test_exception_with_wrong_model():
+    class TestDevice(SwitchbotBaseLight):
+        def __init__(self, device: BLEDevice, model: str = "unknown") -> None:
+            super().__init__(device, model=model)
+
+    ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
+    device = TestDevice(ble_device)
+
+    with pytest.raises(
+        SwitchbotOperationError,
+        match="Current device aa:bb:cc:dd:ee:ff does not support this functionality",
+    ):
+        await device.set_rgb(100, 255, 128, 64)