J. Nick Koston 1 năm trước cách đây
mục cha
commit
1f0846d5c6

+ 9 - 19
switchbot/devices/bot.py

@@ -32,7 +32,7 @@ class Switchbot(SwitchbotDevice):
 
     async def turn_on(self) -> bool:
         """Turn device on."""
-        result = await self._sendcommand(ON_KEY, self._retry_count)
+        result = await self._sendcommand(ON_KEY)
 
         if result[0] == 1:
             return True
@@ -47,7 +47,7 @@ class Switchbot(SwitchbotDevice):
 
     async def turn_off(self) -> bool:
         """Turn device off."""
-        result = await self._sendcommand(OFF_KEY, self._retry_count)
+        result = await self._sendcommand(OFF_KEY)
         if result[0] == 1:
             return True
 
@@ -61,7 +61,7 @@ class Switchbot(SwitchbotDevice):
 
     async def hand_up(self) -> bool:
         """Raise device arm."""
-        result = await self._sendcommand(UP_KEY, self._retry_count)
+        result = await self._sendcommand(UP_KEY)
         if result[0] == 1:
             return True
 
@@ -73,7 +73,7 @@ class Switchbot(SwitchbotDevice):
 
     async def hand_down(self) -> bool:
         """Lower device arm."""
-        result = await self._sendcommand(DOWN_KEY, self._retry_count)
+        result = await self._sendcommand(DOWN_KEY)
         if result[0] == 1:
             return True
 
@@ -85,7 +85,7 @@ class Switchbot(SwitchbotDevice):
 
     async def press(self) -> bool:
         """Press command to device."""
-        result = await self._sendcommand(PRESS_KEY, self._retry_count)
+        result = await self._sendcommand(PRESS_KEY)
         if result[0] == 1:
             return True
 
@@ -102,27 +102,17 @@ class Switchbot(SwitchbotDevice):
         mode_key = format(switch_mode, "b") + format(inverse, "b")
         strength_key = f"{strength:0{2}x}"  # to hex with padding to double digit
 
-        result = await self._sendcommand(
-            DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
-        )
+        result = await self._sendcommand(DEVICE_SET_MODE_KEY + strength_key + mode_key)
 
-        if result[0] == 1:
-            return True
-
-        return False
+        return result[0] == 1
 
     async def set_long_press(self, duration: int = 0) -> bool:
         """Set bot long press duration."""
         duration_key = f"{duration:0{2}x}"  # to hex with padding to double digit
 
-        result = await self._sendcommand(
-            DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
-        )
+        result = await self._sendcommand(DEVICE_SET_EXTENDED_KEY + "08" + duration_key)
 
-        if result[0] == 1:
-            return True
-
-        return False
+        return result[0] == 1
 
     async def get_basic_info(self) -> dict[str, Any] | None:
         """Get device basic settings."""

+ 48 - 7
switchbot/devices/bulb.py

@@ -4,31 +4,72 @@ from typing import Any
 
 from .device import SwitchbotDevice
 
-# Plug Mini keys
-PLUG_ON_KEY = "570f50010180"
-PLUG_OFF_KEY = "570f50010100"
+REQ_HEADER = "570f"
+BULB_COMMMAND_HEADER = "4701"
+BULB_REQUEST = f"{REQ_HEADER}4801"
+
+BULB_COMMAND = f"{REQ_HEADER}{BULB_COMMMAND_HEADER}"
+# Bulb keys
+BULB_ON_KEY = f"{BULB_COMMAND}01"
+BULB_OFF_KEY = f"{BULB_COMMAND}02"
+RGB_BRIGHTNESS_KEY = f"{BULB_COMMAND}12"
+CW_BRIGHTNESS_KEY = f"{BULB_COMMAND}13"
+BRIGHTNESS_KEY = f"{BULB_COMMAND}14"
+RGB_KEY = f"{BULB_COMMAND}16"
+CW_KEY = f"{BULB_COMMAND}17"
 
 
 class SwitchbotBulb(SwitchbotDevice):
     """Representation of a Switchbot bulb."""
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
-        """Switchbot plug mini constructor."""
+        """Switchbot bulb constructor."""
         super().__init__(*args, **kwargs)
         self._settings: dict[str, Any] = {}
 
     async def update(self, interface: int | None = None) -> None:
         """Update state of device."""
-        await self.get_device_data(retry=self._retry_count, interface=interface)
+        result = await self._sendcommand(BULB_REQUEST)
 
     async def turn_on(self) -> bool:
         """Turn device on."""
-        result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
+        result = await self._sendcommand(BULB_ON_KEY)
+        return result[1] == 0x80
+
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        result = await self._sendcommand(BULB_OFF_KEY)
+        return result[1] == 0x00
+
+    async def set_brightness(self, brightness: int) -> bool:
+        """Set brightness."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        result = await self._sendcommand(f"{BRIGHTNESS_KEY}{brightness:02X}")
+        return result[1] == 0x80
+
+    async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
+        """Set color temp."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        assert 2700 <= color_temp <= 6500, "Color Temp must be between 0 and 100"
+        result = await self._sendcommand(
+            f"{CW_BRIGHTNESS_KEY}{brightness:02X}{color_temp:04X}"
+        )
+        return result[1] == 0x80
+
+    async def set_rgb(self, brightness: int, r: int, g: int, b: int) -> bool:
+        """Set rgb."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        assert 0 <= r <= 255, "r must be between 0 and 255"
+        assert 0 <= g <= 255, "g must be between 0 and 255"
+        assert 0 <= b <= 255, "b must be between 0 and 255"
+        result = await self._sendcommand(
+            f"{RGB_BRIGHTNESS_KEY}{brightness:02X}{r:02X}{g:02X}{b:02X}"
+        )
         return result[1] == 0x80
 
     async def turn_off(self) -> bool:
         """Turn device off."""
-        result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
+        result = await self._sendcommand(BULB_OFF_KEY)
         return result[1] == 0x00
 
     def is_on(self) -> bool | None:

+ 10 - 26
switchbot/devices/curtain.py

@@ -41,37 +41,25 @@ class SwitchbotCurtain(SwitchbotDevice):
 
     async def open(self) -> bool:
         """Send open command."""
-        result = await self._sendcommand(OPEN_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(OPEN_KEY)
+        return result[0] == 1
 
     async def close(self) -> bool:
         """Send close command."""
-        result = await self._sendcommand(CLOSE_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(CLOSE_KEY)
+        return result[0] == 1
 
     async def stop(self) -> bool:
         """Send stop command to device."""
-        result = await self._sendcommand(STOP_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(STOP_KEY)
+        return result[0] == 1
 
     async def set_position(self, position: int) -> bool:
         """Send position command (0-100) to device."""
         position = (100 - position) if self._reverse else position
         hex_position = "%0.2X" % position
-        result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(POSITION_KEY + hex_position)
+        return result[0] == 1
 
     async def update(self, interface: int | None = None) -> None:
         """Update position, battery percent and light level of device."""
@@ -107,9 +95,7 @@ class SwitchbotCurtain(SwitchbotDevice):
 
     async def get_extended_info_summary(self) -> dict[str, Any] | None:
         """Get basic info for all devices in chain."""
-        _data = await self._sendcommand(
-            key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
-        )
+        _data = await self._sendcommand(key=CURTAIN_EXT_SUM_KEY)
 
         if _data in (b"\x07", b"\x00"):
             _LOGGER.error("%s: Unsuccessful, please try again", self.name)
@@ -140,9 +126,7 @@ class SwitchbotCurtain(SwitchbotDevice):
     async def get_extended_info_adv(self) -> dict[str, Any] | None:
         """Get advance page info for device chain."""
 
-        _data = await self._sendcommand(
-            key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
-        )
+        _data = await self._sendcommand(key=CURTAIN_EXT_ADV_KEY)
 
         if _data in (b"\x07", b"\x00"):
             _LOGGER.error("%s: Unsuccessful, please try again", self.name)

+ 8 - 4
switchbot/devices/device.py

@@ -8,11 +8,10 @@ from typing import Any
 from uuid import UUID
 
 import async_timeout
-
 from bleak import BleakError
-from bleak.exc import BleakDBusError
 from bleak.backends.device import BLEDevice
 from bleak.backends.service import BleakGATTCharacteristic, BleakGATTServiceCollection
+from bleak.exc import BleakDBusError
 from bleak_retry_connector import (
     BleakClientWithServiceCache,
     BleakNotFoundError,
@@ -101,8 +100,10 @@ class SwitchbotDevice:
         key_suffix = key[4:]
         return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
 
-    async def _sendcommand(self, key: str, retry: int) -> bytes:
+    async def _sendcommand(self, key: str, retry: int | None = None) -> bytes:
         """Send command to device and read response."""
+        if retry is None:
+            retry = self._retry_count
         command = bytearray.fromhex(self._commandkey(key))
         _LOGGER.debug("%s: Sending command %s", self.name, command)
         if self._operation_lock.locked():
@@ -344,9 +345,12 @@ class SwitchbotDevice:
         self._device = advertisement.device
 
     async def get_device_data(
-        self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
+        self, retry: int | None = None, interface: int | None = None
     ) -> SwitchBotAdvertisement | None:
         """Find switchbot devices and their advertisement data."""
+        if retry is None:
+            retry = self._retry_count
+
         if interface:
             _interface: int = interface
         else:

+ 2 - 2
switchbot/devices/plug.py

@@ -19,12 +19,12 @@ class SwitchbotPlugMini(SwitchbotDevice):
 
     async def turn_on(self) -> bool:
         """Turn device on."""
-        result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
+        result = await self._sendcommand(PLUG_ON_KEY)
         return result[1] == 0x80
 
     async def turn_off(self) -> bool:
         """Turn device off."""
-        result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
+        result = await self._sendcommand(PLUG_OFF_KEY)
         return result[1] == 0x00
 
     def is_on(self) -> bool | None: