Browse Source

Add support for the bulbs (#74)

J. Nick Koston 1 year ago
parent
commit
6314604b92

+ 3 - 0
switchbot/__init__.py

@@ -4,6 +4,7 @@ from __future__ import annotations
 from .adv_parser import SwitchbotSupportedType, parse_advertisement_data
 from .const import SwitchbotModel
 from .devices.bot import Switchbot
+from .devices.bulb import ColorMode, SwitchbotBulb
 from .devices.curtain import SwitchbotCurtain
 from .devices.device import SwitchbotDevice
 from .devices.plug import SwitchbotPlugMini
@@ -14,6 +15,8 @@ __all__ = [
     "parse_advertisement_data",
     "GetSwitchbotDevices",
     "SwitchBotAdvertisement",
+    "ColorMode",
+    "SwitchbotBulb",
     "SwitchbotDevice",
     "SwitchbotCurtain",
     "Switchbot",

+ 1 - 1
switchbot/adv_parsers/bulb.py

@@ -11,7 +11,7 @@ def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool |
         "brightness": mfr_data[7] & 0b01111111,
         "delay": bool(mfr_data[8] & 0b10000000),
         "preset": bool(mfr_data[8] & 0b00001000),
-        "light_state": mfr_data[8] & 0b00000111,
+        "color_mode": mfr_data[8] & 0b00000111,
         "speed": mfr_data[9] & 0b01111111,
         "loop_index": mfr_data[10] & 0b11111110,
     }

+ 9 - 19
switchbot/devices/bot.py

@@ -32,7 +32,7 @@ class Switchbot(SwitchbotDevice):
 
     async def turn_on(self) -> bool:
         """Turn device on."""
-        result = await self._sendcommand(ON_KEY, self._retry_count)
+        result = await self._sendcommand(ON_KEY)
 
         if result[0] == 1:
             return True
@@ -47,7 +47,7 @@ class Switchbot(SwitchbotDevice):
 
     async def turn_off(self) -> bool:
         """Turn device off."""
-        result = await self._sendcommand(OFF_KEY, self._retry_count)
+        result = await self._sendcommand(OFF_KEY)
         if result[0] == 1:
             return True
 
@@ -61,7 +61,7 @@ class Switchbot(SwitchbotDevice):
 
     async def hand_up(self) -> bool:
         """Raise device arm."""
-        result = await self._sendcommand(UP_KEY, self._retry_count)
+        result = await self._sendcommand(UP_KEY)
         if result[0] == 1:
             return True
 
@@ -73,7 +73,7 @@ class Switchbot(SwitchbotDevice):
 
     async def hand_down(self) -> bool:
         """Lower device arm."""
-        result = await self._sendcommand(DOWN_KEY, self._retry_count)
+        result = await self._sendcommand(DOWN_KEY)
         if result[0] == 1:
             return True
 
@@ -85,7 +85,7 @@ class Switchbot(SwitchbotDevice):
 
     async def press(self) -> bool:
         """Press command to device."""
-        result = await self._sendcommand(PRESS_KEY, self._retry_count)
+        result = await self._sendcommand(PRESS_KEY)
         if result[0] == 1:
             return True
 
@@ -102,27 +102,17 @@ class Switchbot(SwitchbotDevice):
         mode_key = format(switch_mode, "b") + format(inverse, "b")
         strength_key = f"{strength:0{2}x}"  # to hex with padding to double digit
 
-        result = await self._sendcommand(
-            DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
-        )
+        result = await self._sendcommand(DEVICE_SET_MODE_KEY + strength_key + mode_key)
 
-        if result[0] == 1:
-            return True
-
-        return False
+        return result[0] == 1
 
     async def set_long_press(self, duration: int = 0) -> bool:
         """Set bot long press duration."""
         duration_key = f"{duration:0{2}x}"  # to hex with padding to double digit
 
-        result = await self._sendcommand(
-            DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
-        )
+        result = await self._sendcommand(DEVICE_SET_EXTENDED_KEY + "08" + duration_key)
 
-        if result[0] == 1:
-            return True
-
-        return False
+        return result[0] == 1
 
     async def get_basic_info(self) -> dict[str, Any] | None:
         """Get device basic settings."""

+ 155 - 0
switchbot/devices/bulb.py

@@ -1 +1,156 @@
 from __future__ import annotations
+
+import asyncio
+import logging
+from enum import Enum
+from typing import Any
+
+from switchbot.models import SwitchBotAdvertisement
+
+from .device import SwitchbotDevice
+
+REQ_HEADER = "570f"
+BULB_COMMMAND_HEADER = "4701"
+BULB_REQUEST = f"{REQ_HEADER}4801"
+
+BULB_COMMAND = f"{REQ_HEADER}{BULB_COMMMAND_HEADER}"
+# Bulb keys
+BULB_ON_KEY = f"{BULB_COMMAND}01"
+BULB_OFF_KEY = f"{BULB_COMMAND}02"
+RGB_BRIGHTNESS_KEY = f"{BULB_COMMAND}12"
+CW_BRIGHTNESS_KEY = f"{BULB_COMMAND}13"
+BRIGHTNESS_KEY = f"{BULB_COMMAND}14"
+RGB_KEY = f"{BULB_COMMAND}16"
+CW_KEY = f"{BULB_COMMAND}17"
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class ColorMode(Enum):
+
+    OFF = 0
+    COLOR_TEMP = 1
+    RGB = 2
+    EFFECT = 3
+
+
+class SwitchbotBulb(SwitchbotDevice):
+    """Representation of a Switchbot bulb."""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        """Switchbot bulb constructor."""
+        super().__init__(*args, **kwargs)
+        self._state: dict[str, Any] = {}
+
+    @property
+    def on(self) -> bool | None:
+        """Return if bulb is on."""
+        return self.is_on()
+
+    @property
+    def rgb(self) -> tuple[int, int, int] | None:
+        """Return the current rgb value."""
+        if "r" not in self._state or "g" not in self._state or "b" not in self._state:
+            return None
+        return self._state["r"], self._state["g"], self._state["b"]
+
+    @property
+    def color_temp(self) -> int | None:
+        """Return the current color temp value."""
+        return self._state.get("cw") or self.min_temp
+
+    @property
+    def brightness(self) -> int | None:
+        """Return the current brightness value."""
+        return self._get_adv_value("brightness") or 0
+
+    @property
+    def color_mode(self) -> ColorMode:
+        """Return the current color mode."""
+        return ColorMode(self._get_adv_value("color_mode") or 0)
+
+    @property
+    def min_temp(self) -> int:
+        """Return minimum color temp."""
+        return 2700
+
+    @property
+    def max_temp(self) -> int:
+        """Return maximum color temp."""
+        return 6500
+
+    async def update(self) -> None:
+        """Update state of device."""
+        result = await self._sendcommand(BULB_REQUEST)
+        self._update_state(result)
+
+    async def turn_on(self) -> bool:
+        """Turn device on."""
+        result = await self._sendcommand(BULB_ON_KEY)
+        self._update_state(result)
+        return result[1] == 0x80
+
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        result = await self._sendcommand(BULB_OFF_KEY)
+        self._update_state(result)
+        return result[1] == 0x00
+
+    async def set_brightness(self, brightness: int) -> bool:
+        """Set brightness."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        result = await self._sendcommand(f"{BRIGHTNESS_KEY}{brightness:02X}")
+        self._update_state(result)
+        return result[1] == 0x80
+
+    async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
+        """Set color temp."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        assert 2700 <= color_temp <= 6500, "Color Temp must be between 0 and 100"
+        result = await self._sendcommand(
+            f"{CW_BRIGHTNESS_KEY}{brightness:02X}{color_temp:04X}"
+        )
+        self._update_state(result)
+        return result[1] == 0x80
+
+    async def set_rgb(self, brightness: int, r: int, g: int, b: int) -> bool:
+        """Set rgb."""
+        assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
+        assert 0 <= r <= 255, "r must be between 0 and 255"
+        assert 0 <= g <= 255, "g must be between 0 and 255"
+        assert 0 <= b <= 255, "b must be between 0 and 255"
+        result = await self._sendcommand(
+            f"{RGB_BRIGHTNESS_KEY}{brightness:02X}{r:02X}{g:02X}{b:02X}"
+        )
+        self._update_state(result)
+        return result[1] == 0x80
+
+    def is_on(self) -> bool | None:
+        """Return bulb state from cache."""
+        return self._get_adv_value("isOn")
+
+    def _update_state(self, result: bytes) -> None:
+        """Update device state."""
+        self._state["r"] = result[3]
+        self._state["g"] = result[4]
+        self._state["b"] = result[5]
+        self._state["cw"] = int(result[6:8].hex(), 16)
+        _LOGGER.debug(
+            "%s: Bulb update state: %s = %s", self.name, result.hex(), self._state
+        )
+        self._fire_callbacks()
+
+    def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
+        """Update device data from advertisement."""
+        current_state = self._get_adv_value("sequence_number")
+        super().update_from_advertisement(advertisement)
+        new_state = self._get_adv_value("sequence_number")
+        _LOGGER.debug(
+            "%s: Bulb update advertisement: %s (seq before: %s) (seq after: %s)",
+            self.name,
+            advertisement,
+            current_state,
+            new_state,
+        )
+        if current_state != new_state:
+            asyncio.ensure_future(self.update())

+ 10 - 26
switchbot/devices/curtain.py

@@ -41,37 +41,25 @@ class SwitchbotCurtain(SwitchbotDevice):
 
     async def open(self) -> bool:
         """Send open command."""
-        result = await self._sendcommand(OPEN_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(OPEN_KEY)
+        return result[0] == 1
 
     async def close(self) -> bool:
         """Send close command."""
-        result = await self._sendcommand(CLOSE_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(CLOSE_KEY)
+        return result[0] == 1
 
     async def stop(self) -> bool:
         """Send stop command to device."""
-        result = await self._sendcommand(STOP_KEY, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(STOP_KEY)
+        return result[0] == 1
 
     async def set_position(self, position: int) -> bool:
         """Send position command (0-100) to device."""
         position = (100 - position) if self._reverse else position
         hex_position = "%0.2X" % position
-        result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
-        if result[0] == 1:
-            return True
-
-        return False
+        result = await self._sendcommand(POSITION_KEY + hex_position)
+        return result[0] == 1
 
     async def update(self, interface: int | None = None) -> None:
         """Update position, battery percent and light level of device."""
@@ -107,9 +95,7 @@ class SwitchbotCurtain(SwitchbotDevice):
 
     async def get_extended_info_summary(self) -> dict[str, Any] | None:
         """Get basic info for all devices in chain."""
-        _data = await self._sendcommand(
-            key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
-        )
+        _data = await self._sendcommand(key=CURTAIN_EXT_SUM_KEY)
 
         if _data in (b"\x07", b"\x00"):
             _LOGGER.error("%s: Unsuccessful, please try again", self.name)
@@ -140,9 +126,7 @@ class SwitchbotCurtain(SwitchbotDevice):
     async def get_extended_info_adv(self) -> dict[str, Any] | None:
         """Get advance page info for device chain."""
 
-        _data = await self._sendcommand(
-            key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
-        )
+        _data = await self._sendcommand(key=CURTAIN_EXT_ADV_KEY)
 
         if _data in (b"\x07", b"\x00"):
             _LOGGER.error("%s: Unsuccessful, please try again", self.name)

+ 29 - 6
switchbot/devices/device.py

@@ -4,15 +4,14 @@ from __future__ import annotations
 import asyncio
 import binascii
 import logging
-from typing import Any
+from typing import Any, Callable
 from uuid import UUID
 
 import async_timeout
-
 from bleak import BleakError
-from bleak.exc import BleakDBusError
 from bleak.backends.device import BLEDevice
 from bleak.backends.service import BleakGATTCharacteristic, BleakGATTServiceCollection
+from bleak.exc import BleakDBusError
 from bleak_retry_connector import (
     BleakClientWithServiceCache,
     BleakNotFoundError,
@@ -92,6 +91,7 @@ class SwitchbotDevice:
         self._disconnect_timer: asyncio.TimerHandle | None = None
         self._expected_disconnect = False
         self.loop = asyncio.get_event_loop()
+        self._callbacks: list[Callable[[], None]] = []
 
     def _commandkey(self, key: str) -> str:
         """Add password to key if set."""
@@ -101,8 +101,10 @@ class SwitchbotDevice:
         key_suffix = key[4:]
         return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix
 
-    async def _sendcommand(self, key: str, retry: int) -> bytes:
+    async def _sendcommand(self, key: str, retry: int | None = None) -> bytes:
         """Send command to device and read response."""
+        if retry is None:
+            retry = self._retry_count
         command = bytearray.fromhex(self._commandkey(key))
         _LOGGER.debug("%s: Sending command %s", self.name, command)
         if self._operation_lock.locked():
@@ -330,7 +332,7 @@ class SwitchbotDevice:
         """Return value from advertisement data."""
         if not self._sb_adv_data:
             return None
-        return self._sb_adv_data.data["data"][key]
+        return self._sb_adv_data.data["data"].get(key)
 
     def get_battery_percent(self) -> Any:
         """Return device battery level in percent."""
@@ -344,9 +346,12 @@ class SwitchbotDevice:
         self._device = advertisement.device
 
     async def get_device_data(
-        self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None
+        self, retry: int | None = None, interface: int | None = None
     ) -> SwitchBotAdvertisement | None:
         """Find switchbot devices and their advertisement data."""
+        if retry is None:
+            retry = self._retry_count
+
         if interface:
             _interface: int = interface
         else:
@@ -372,3 +377,21 @@ class SwitchbotDevice:
             return None
 
         return _data
+
+    def _fire_callbacks(self) -> None:
+        """Fire callbacks."""
+        for callback in self._callbacks:
+            callback()
+
+    def subscribe(self, callback: Callable[[], None]) -> Callable[[], None]:
+        """Subscribe to device notifications."""
+        self._callbacks.append(callback)
+
+        def _unsub() -> None:
+            """Unsubscribe from device notifications."""
+            self._callbacks.remove(callback)
+
+        return _unsub
+
+    async def update(self) -> None:
+        """Update state of device."""

+ 3 - 4
switchbot/devices/plug.py

@@ -19,15 +19,14 @@ class SwitchbotPlugMini(SwitchbotDevice):
 
     async def turn_on(self) -> bool:
         """Turn device on."""
-        result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
+        result = await self._sendcommand(PLUG_ON_KEY)
         return result[1] == 0x80
 
     async def turn_off(self) -> bool:
         """Turn device off."""
-        result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
+        result = await self._sendcommand(PLUG_OFF_KEY)
         return result[1] == 0x00
 
-    def is_on(self) -> Any:
+    def is_on(self) -> bool | None:
         """Return switch state from cache."""
-        # To get actual position call update() first.
         return self._get_adv_value("isOn")